Tools: Part 6: Decoupled Architecture

Tools: Part 6: Decoupled Architecture

Target Architecture ## Part A — Create Cognito (User Pool) and Protect API Gateway ## A1) Create a Cognito User Pool ## A2) Create a test user ## A3) Add Cognito Authorizer in API Gateway ## If you use HTTP API ## If you use REST API ## A4) Test login to get JWT (from your Mac) ## Part B — Create Kinesis Stream ## B1) Create Data Stream ## Part C — Update Upload Lambda to Publish to Kinesis ## C1) Add IAM Permission to Upload Lambda Role ## C2) Update Lambda Code (Node.js example) ## Set Lambda environment variables ## Part D — Create Kinesis Consumer Lambda (Processor) ## D1) Create a new Lambda: kinesis-processor ## D2) Add Event Source Mapping (Kinesis → Lambda) ## D3) Processor Lambda Code (Node.js example) ## Part E — End-to-End Test ## Common Issues (Fast Fixes) This avoids extra moving parts and makes Kinesis actually useful. AWS Console → Cognito → Create user pool Cognito → Your User Pool → Users → Create user API Gateway → your HTTP API (or REST API) Authorization → Create JWT authorizer Attach authorizer to route POST /uploads Use AWS CLI “cognito-idp” to authenticate and get an IdToken. Now test API Gateway with auth header: If you see 401, the authorizer settings (issuer/audience) or route attachment is wrong. AWS Console → Kinesis → Data streams → Create IAM → Roles → your upload lambda role → Add permissions: Minimum policy (adjust region/account/stream name): (Keep your existing S3 permissions.) In your Upload Handler Lambda, after writing to S3, send metadata to Kinesis. Lambda → Configuration → Environment variables: Runtime: Node.js (or Python) Example IAM permissions: Lambda → kinesis-processor → Add trigger → Kinesis This reads Kinesis records, fetches from S3, writes “processed” output to another bucket/prefix. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to ? It will become hidden in your post, but will still be visible via the comment's permalink. as well , this person and/or CODE_BLOCK: REGION="us-east-2" CLIENT_ID="YOUR_APP_CLIENT_ID" USERNAME="[email protected]" PASSWORD="YourPermanentPassword" aws cognito-idp initiate-auth \ --region "$REGION" \ --auth-flow USER_PASSWORD_AUTH \ --client-id "$CLIENT_ID" \ --auth-parameters USERNAME="$USERNAME",PASSWORD="$PASSWORD" CODE_BLOCK: REGION="us-east-2" CLIENT_ID="YOUR_APP_CLIENT_ID" USERNAME="[email protected]" PASSWORD="YourPermanentPassword" aws cognito-idp initiate-auth \ --region "$REGION" \ --auth-flow USER_PASSWORD_AUTH \ --client-id "$CLIENT_ID" \ --auth-parameters USERNAME="$USERNAME",PASSWORD="$PASSWORD" CODE_BLOCK: REGION="us-east-2" CLIENT_ID="YOUR_APP_CLIENT_ID" USERNAME="[email protected]" PASSWORD="YourPermanentPassword" aws cognito-idp initiate-auth \ --region "$REGION" \ --auth-flow USER_PASSWORD_AUTH \ --client-id "$CLIENT_ID" \ --auth-parameters USERNAME="$USERNAME",PASSWORD="$PASSWORD" CODE_BLOCK: API="https://YOUR_API_ID.execute-api.us-east-2.amazonaws.com/uploads" TOKEN="PASTE_ID_TOKEN_HERE" curl -i -X POST "$API" \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"filename":"test.txt","content":"hello from cognito"}' CODE_BLOCK: API="https://YOUR_API_ID.execute-api.us-east-2.amazonaws.com/uploads" TOKEN="PASTE_ID_TOKEN_HERE" curl -i -X POST "$API" \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"filename":"test.txt","content":"hello from cognito"}' CODE_BLOCK: API="https://YOUR_API_ID.execute-api.us-east-2.amazonaws.com/uploads" TOKEN="PASTE_ID_TOKEN_HERE" curl -i -X POST "$API" \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"filename":"test.txt","content":"hello from cognito"}' CODE_BLOCK: { "Version": "2012-10-17", "Statement": [ { "Sid": "PutToKinesis", "Effect": "Allow", "Action": ["kinesis:PutRecord", "kinesis:PutRecords"], "Resource": "arn:aws:kinesis:us-east-2:YOUR_ACCOUNT_ID:stream/uploads-stream" } ] } CODE_BLOCK: { "Version": "2012-10-17", "Statement": [ { "Sid": "PutToKinesis", "Effect": "Allow", "Action": ["kinesis:PutRecord", "kinesis:PutRecords"], "Resource": "arn:aws:kinesis:us-east-2:YOUR_ACCOUNT_ID:stream/uploads-stream" } ] } CODE_BLOCK: { "Version": "2012-10-17", "Statement": [ { "Sid": "PutToKinesis", "Effect": "Allow", "Action": ["kinesis:PutRecord", "kinesis:PutRecords"], "Resource": "arn:aws:kinesis:us-east-2:YOUR_ACCOUNT_ID:stream/uploads-stream" } ] } COMMAND_BLOCK: import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3"; import { KinesisClient, PutRecordCommand } from "@aws-sdk/client-kinesis"; import crypto from "crypto"; const s3 = new S3Client({}); const kinesis = new KinesisClient({}); const UPLOAD_BUCKET = process.env.UPLOAD_BUCKET; const STREAM_NAME = process.env.STREAM_NAME; export const handler = async (event) => { // For HTTP API, body is JSON string const body = event.body ? JSON.parse(event.body) : {}; const filename = body.filename || `file-${Date.now()}.txt`; const content = body.content || "empty"; const key = `uploads/${filename}`; // 1) Save to S3 await s3.send(new PutObjectCommand({ Bucket: UPLOAD_BUCKET, Key: key, Body: content, ContentType: "text/plain" })); // 2) Publish event to Kinesis const record = { bucket: UPLOAD_BUCKET, key, uploadedAt: new Date().toISOString(), requestId: event.requestContext?.requestId, userSub: event.requestContext?.authorizer?.jwt?.claims?.sub // HTTP API JWT authorizer }; await kinesis.send(new PutRecordCommand({ StreamName: STREAM_NAME, PartitionKey: crypto.randomUUID(), Data: Buffer.from(JSON.stringify(record)) })); return { statusCode: 200, body: JSON.stringify({ message: "Uploaded and queued for processing", s3: { bucket: UPLOAD_BUCKET, key } }) }; }; COMMAND_BLOCK: import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3"; import { KinesisClient, PutRecordCommand } from "@aws-sdk/client-kinesis"; import crypto from "crypto"; const s3 = new S3Client({}); const kinesis = new KinesisClient({}); const UPLOAD_BUCKET = process.env.UPLOAD_BUCKET; const STREAM_NAME = process.env.STREAM_NAME; export const handler = async (event) => { // For HTTP API, body is JSON string const body = event.body ? JSON.parse(event.body) : {}; const filename = body.filename || `file-${Date.now()}.txt`; const content = body.content || "empty"; const key = `uploads/${filename}`; // 1) Save to S3 await s3.send(new PutObjectCommand({ Bucket: UPLOAD_BUCKET, Key: key, Body: content, ContentType: "text/plain" })); // 2) Publish event to Kinesis const record = { bucket: UPLOAD_BUCKET, key, uploadedAt: new Date().toISOString(), requestId: event.requestContext?.requestId, userSub: event.requestContext?.authorizer?.jwt?.claims?.sub // HTTP API JWT authorizer }; await kinesis.send(new PutRecordCommand({ StreamName: STREAM_NAME, PartitionKey: crypto.randomUUID(), Data: Buffer.from(JSON.stringify(record)) })); return { statusCode: 200, body: JSON.stringify({ message: "Uploaded and queued for processing", s3: { bucket: UPLOAD_BUCKET, key } }) }; }; COMMAND_BLOCK: import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3"; import { KinesisClient, PutRecordCommand } from "@aws-sdk/client-kinesis"; import crypto from "crypto"; const s3 = new S3Client({}); const kinesis = new KinesisClient({}); const UPLOAD_BUCKET = process.env.UPLOAD_BUCKET; const STREAM_NAME = process.env.STREAM_NAME; export const handler = async (event) => { // For HTTP API, body is JSON string const body = event.body ? JSON.parse(event.body) : {}; const filename = body.filename || `file-${Date.now()}.txt`; const content = body.content || "empty"; const key = `uploads/${filename}`; // 1) Save to S3 await s3.send(new PutObjectCommand({ Bucket: UPLOAD_BUCKET, Key: key, Body: content, ContentType: "text/plain" })); // 2) Publish event to Kinesis const record = { bucket: UPLOAD_BUCKET, key, uploadedAt: new Date().toISOString(), requestId: event.requestContext?.requestId, userSub: event.requestContext?.authorizer?.jwt?.claims?.sub // HTTP API JWT authorizer }; await kinesis.send(new PutRecordCommand({ StreamName: STREAM_NAME, PartitionKey: crypto.randomUUID(), Data: Buffer.from(JSON.stringify(record)) })); return { statusCode: 200, body: JSON.stringify({ message: "Uploaded and queued for processing", s3: { bucket: UPLOAD_BUCKET, key } }) }; }; CODE_BLOCK: { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadKinesis", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-2:YOUR_ACCOUNT_ID:stream/uploads-stream" }, { "Sid": "S3ReadWrite", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": [ "arn:aws:s3:::YOUR_UPLOAD_BUCKET/*", "arn:aws:s3:::YOUR_PROCESSED_BUCKET/*" ] } ] } CODE_BLOCK: { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadKinesis", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-2:YOUR_ACCOUNT_ID:stream/uploads-stream" }, { "Sid": "S3ReadWrite", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": [ "arn:aws:s3:::YOUR_UPLOAD_BUCKET/*", "arn:aws:s3:::YOUR_PROCESSED_BUCKET/*" ] } ] } CODE_BLOCK: { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadKinesis", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-2:YOUR_ACCOUNT_ID:stream/uploads-stream" }, { "Sid": "S3ReadWrite", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": [ "arn:aws:s3:::YOUR_UPLOAD_BUCKET/*", "arn:aws:s3:::YOUR_PROCESSED_BUCKET/*" ] } ] } COMMAND_BLOCK: import { S3Client, GetObjectCommand, PutObjectCommand } from "@aws-sdk/client-s3"; const s3 = new S3Client({}); const PROCESSED_BUCKET = process.env.PROCESSED_BUCKET; const streamToString = async (readable) => { const chunks = []; for await (const chunk of readable) chunks.push(chunk); return Buffer.concat(chunks).toString("utf-8"); }; export const handler = async (event) => { for (const r of event.Records) { const payload = JSON.parse(Buffer.from(r.kinesis.data, "base64").toString("utf-8")); const { bucket, key } = payload; // Read uploaded object const obj = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key })); const content = await streamToString(obj.Body); // "Process" (simple example) const processed = content.toUpperCase(); // Write processed object const outKey = key.replace("uploads/", "processed/"); await s3.send(new PutObjectCommand({ Bucket: PROCESSED_BUCKET, Key: outKey, Body: processed, ContentType: "text/plain" })); } return { ok: true }; }; COMMAND_BLOCK: import { S3Client, GetObjectCommand, PutObjectCommand } from "@aws-sdk/client-s3"; const s3 = new S3Client({}); const PROCESSED_BUCKET = process.env.PROCESSED_BUCKET; const streamToString = async (readable) => { const chunks = []; for await (const chunk of readable) chunks.push(chunk); return Buffer.concat(chunks).toString("utf-8"); }; export const handler = async (event) => { for (const r of event.Records) { const payload = JSON.parse(Buffer.from(r.kinesis.data, "base64").toString("utf-8")); const { bucket, key } = payload; // Read uploaded object const obj = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key })); const content = await streamToString(obj.Body); // "Process" (simple example) const processed = content.toUpperCase(); // Write processed object const outKey = key.replace("uploads/", "processed/"); await s3.send(new PutObjectCommand({ Bucket: PROCESSED_BUCKET, Key: outKey, Body: processed, ContentType: "text/plain" })); } return { ok: true }; }; COMMAND_BLOCK: import { S3Client, GetObjectCommand, PutObjectCommand } from "@aws-sdk/client-s3"; const s3 = new S3Client({}); const PROCESSED_BUCKET = process.env.PROCESSED_BUCKET; const streamToString = async (readable) => { const chunks = []; for await (const chunk of readable) chunks.push(chunk); return Buffer.concat(chunks).toString("utf-8"); }; export const handler = async (event) => { for (const r of event.Records) { const payload = JSON.parse(Buffer.from(r.kinesis.data, "base64").toString("utf-8")); const { bucket, key } = payload; // Read uploaded object const obj = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key })); const content = await streamToString(obj.Body); // "Process" (simple example) const processed = content.toUpperCase(); // Write processed object const outKey = key.replace("uploads/", "processed/"); await s3.send(new PutObjectCommand({ Bucket: PROCESSED_BUCKET, Key: outKey, Body: processed, ContentType: "text/plain" })); } return { ok: true }; }; COMMAND_BLOCK: curl -i -X POST "$API" \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"filename":"demo.txt","content":"hello kinesis"}' COMMAND_BLOCK: curl -i -X POST "$API" \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"filename":"demo.txt","content":"hello kinesis"}' COMMAND_BLOCK: curl -i -X POST "$API" \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"filename":"demo.txt","content":"hello kinesis"}' - Client authenticates with Cognito → gets JWT token - Client calls API Gateway /uploads with Authorization: Bearer - API Gateway invokes Lambda Upload Handler - Lambda stores file/metadata to S3 (uploads/...) - Lambda publishes an event to Kinesis Data Stream - Kinesis Consumer Lambda processes and writes to S3 (processed/...) (or DynamoDB, etc.) - Sign-in options: Email - MFA: optional (you can enable later) - App integration: Create App client (no client secret for browser/mobile; for your curl tests it’s also easier without secret) - Create App client (no client secret for browser/mobile; for your curl tests it’s also easier without secret) - Create pool - Create App client (no client secret for browser/mobile; for your curl tests it’s also easier without secret) - User Pool ID (example: us-east-2_XXXX) - App Client ID - Email + temporary password - Then you’ll set a new password on first login (or create with “permanent password” if you want). - Authorization → Create JWT authorizer Issuer URL: https://cognito-idp.<region>.amazonaws.com/<USER_POOL_ID> Audience: <APP_CLIENT_ID> - Issuer URL: - https://cognito-idp.<region>.amazonaws.com/<USER_POOL_ID> - <APP_CLIENT_ID> - Attach authorizer to route POST /uploads - Issuer URL: - https://cognito-idp.<region>.amazonaws.com/<USER_POOL_ID> - <APP_CLIENT_ID> - Create Cognito User Pool Authorizer - Attach to method POST /uploads and enable authorization - AuthenticationResult.IdToken (this is your JWT) - Name: uploads-stream - Capacity mode: On-demand (simplest for lab) - UPLOAD_BUCKET = your uploads bucket name - STREAM_NAME = uploads-stream - Read from Kinesis - Write to processed S3 bucket (or DynamoDB) - Stream: uploads-stream - Batch size: 10 (default OK) - Starting position: LATEST - PROCESSED_BUCKET = your processed bucket name (or same bucket, different prefix) - Get JWT token (A4) - S3 has uploads/demo.txt - Kinesis stream has incoming records (Monitoring metrics) - Processor Lambda logs show it ran - S3 has processed/demo.txt (content becomes HELLO KINESIS) - 401 Unauthorized: authorizer not attached to route/method OR issuer/audience wrong. - Lambda can’t PutRecord: missing kinesis:PutRecord permission OR wrong stream ARN/region. - Processor not triggering: event source mapping disabled or starting position wrong. - S3 AccessDenied: processor role missing s3:GetObject or s3:PutObject.