Tools
Tools: Part 6: Decoupled Architecture
2026-03-03
0 views
admin
Target Architecture ## Part A — Create Cognito (User Pool) and Protect API Gateway ## A1) Create a Cognito User Pool ## A2) Create a test user ## A3) Add Cognito Authorizer in API Gateway ## If you use HTTP API ## If you use REST API ## A4) Test login to get JWT (from your Mac) ## Part B — Create Kinesis Stream ## B1) Create Data Stream ## Part C — Update Upload Lambda to Publish to Kinesis ## C1) Add IAM Permission to Upload Lambda Role ## C2) Update Lambda Code (Node.js example) ## Set Lambda environment variables ## Part D — Create Kinesis Consumer Lambda (Processor) ## D1) Create a new Lambda: kinesis-processor ## D2) Add Event Source Mapping (Kinesis → Lambda) ## D3) Processor Lambda Code (Node.js example) ## Part E — End-to-End Test ## Common Issues (Fast Fixes) This avoids extra moving parts and makes Kinesis actually useful. AWS Console → Cognito → Create user pool Cognito → Your User Pool → Users → Create user API Gateway → your HTTP API (or REST API) Authorization → Create JWT authorizer Attach authorizer to route POST /uploads Use AWS CLI “cognito-idp” to authenticate and get an IdToken. Now test API Gateway with auth header: If you see 401, the authorizer settings (issuer/audience) or route attachment is wrong. AWS Console → Kinesis → Data streams → Create IAM → Roles → your upload lambda role → Add permissions: Minimum policy (adjust region/account/stream name): (Keep your existing S3 permissions.) In your Upload Handler Lambda, after writing to S3, send metadata to Kinesis. Lambda → Configuration → Environment variables: Runtime: Node.js (or Python) Example IAM permissions: Lambda → kinesis-processor → Add trigger → Kinesis This reads Kinesis records, fetches from S3, writes “processed” output to another bucket/prefix. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse CODE_BLOCK:
REGION="us-east-2"
CLIENT_ID="YOUR_APP_CLIENT_ID"
USERNAME="[email protected]"
PASSWORD="YourPermanentPassword" aws cognito-idp initiate-auth \ --region "$REGION" \ --auth-flow USER_PASSWORD_AUTH \ --client-id "$CLIENT_ID" \ --auth-parameters USERNAME="$USERNAME",PASSWORD="$PASSWORD" Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
REGION="us-east-2"
CLIENT_ID="YOUR_APP_CLIENT_ID"
USERNAME="[email protected]"
PASSWORD="YourPermanentPassword" aws cognito-idp initiate-auth \ --region "$REGION" \ --auth-flow USER_PASSWORD_AUTH \ --client-id "$CLIENT_ID" \ --auth-parameters USERNAME="$USERNAME",PASSWORD="$PASSWORD" CODE_BLOCK:
REGION="us-east-2"
CLIENT_ID="YOUR_APP_CLIENT_ID"
USERNAME="[email protected]"
PASSWORD="YourPermanentPassword" aws cognito-idp initiate-auth \ --region "$REGION" \ --auth-flow USER_PASSWORD_AUTH \ --client-id "$CLIENT_ID" \ --auth-parameters USERNAME="$USERNAME",PASSWORD="$PASSWORD" CODE_BLOCK:
API="https://YOUR_API_ID.execute-api.us-east-2.amazonaws.com/uploads"
TOKEN="PASTE_ID_TOKEN_HERE" curl -i -X POST "$API" \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"filename":"test.txt","content":"hello from cognito"}' Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
API="https://YOUR_API_ID.execute-api.us-east-2.amazonaws.com/uploads"
TOKEN="PASTE_ID_TOKEN_HERE" curl -i -X POST "$API" \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"filename":"test.txt","content":"hello from cognito"}' CODE_BLOCK:
API="https://YOUR_API_ID.execute-api.us-east-2.amazonaws.com/uploads"
TOKEN="PASTE_ID_TOKEN_HERE" curl -i -X POST "$API" \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"filename":"test.txt","content":"hello from cognito"}' CODE_BLOCK:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "PutToKinesis", "Effect": "Allow", "Action": ["kinesis:PutRecord", "kinesis:PutRecords"], "Resource": "arn:aws:kinesis:us-east-2:YOUR_ACCOUNT_ID:stream/uploads-stream" } ]
} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "PutToKinesis", "Effect": "Allow", "Action": ["kinesis:PutRecord", "kinesis:PutRecords"], "Resource": "arn:aws:kinesis:us-east-2:YOUR_ACCOUNT_ID:stream/uploads-stream" } ]
} CODE_BLOCK:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "PutToKinesis", "Effect": "Allow", "Action": ["kinesis:PutRecord", "kinesis:PutRecords"], "Resource": "arn:aws:kinesis:us-east-2:YOUR_ACCOUNT_ID:stream/uploads-stream" } ]
} COMMAND_BLOCK:
import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
import { KinesisClient, PutRecordCommand } from "@aws-sdk/client-kinesis";
import crypto from "crypto"; const s3 = new S3Client({});
const kinesis = new KinesisClient({}); const UPLOAD_BUCKET = process.env.UPLOAD_BUCKET;
const STREAM_NAME = process.env.STREAM_NAME; export const handler = async (event) => { // For HTTP API, body is JSON string const body = event.body ? JSON.parse(event.body) : {}; const filename = body.filename || `file-${Date.now()}.txt`; const content = body.content || "empty"; const key = `uploads/${filename}`; // 1) Save to S3 await s3.send(new PutObjectCommand({ Bucket: UPLOAD_BUCKET, Key: key, Body: content, ContentType: "text/plain" })); // 2) Publish event to Kinesis const record = { bucket: UPLOAD_BUCKET, key, uploadedAt: new Date().toISOString(), requestId: event.requestContext?.requestId, userSub: event.requestContext?.authorizer?.jwt?.claims?.sub // HTTP API JWT authorizer }; await kinesis.send(new PutRecordCommand({ StreamName: STREAM_NAME, PartitionKey: crypto.randomUUID(), Data: Buffer.from(JSON.stringify(record)) })); return { statusCode: 200, body: JSON.stringify({ message: "Uploaded and queued for processing", s3: { bucket: UPLOAD_BUCKET, key } }) };
}; Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
import { KinesisClient, PutRecordCommand } from "@aws-sdk/client-kinesis";
import crypto from "crypto"; const s3 = new S3Client({});
const kinesis = new KinesisClient({}); const UPLOAD_BUCKET = process.env.UPLOAD_BUCKET;
const STREAM_NAME = process.env.STREAM_NAME; export const handler = async (event) => { // For HTTP API, body is JSON string const body = event.body ? JSON.parse(event.body) : {}; const filename = body.filename || `file-${Date.now()}.txt`; const content = body.content || "empty"; const key = `uploads/${filename}`; // 1) Save to S3 await s3.send(new PutObjectCommand({ Bucket: UPLOAD_BUCKET, Key: key, Body: content, ContentType: "text/plain" })); // 2) Publish event to Kinesis const record = { bucket: UPLOAD_BUCKET, key, uploadedAt: new Date().toISOString(), requestId: event.requestContext?.requestId, userSub: event.requestContext?.authorizer?.jwt?.claims?.sub // HTTP API JWT authorizer }; await kinesis.send(new PutRecordCommand({ StreamName: STREAM_NAME, PartitionKey: crypto.randomUUID(), Data: Buffer.from(JSON.stringify(record)) })); return { statusCode: 200, body: JSON.stringify({ message: "Uploaded and queued for processing", s3: { bucket: UPLOAD_BUCKET, key } }) };
}; COMMAND_BLOCK:
import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
import { KinesisClient, PutRecordCommand } from "@aws-sdk/client-kinesis";
import crypto from "crypto"; const s3 = new S3Client({});
const kinesis = new KinesisClient({}); const UPLOAD_BUCKET = process.env.UPLOAD_BUCKET;
const STREAM_NAME = process.env.STREAM_NAME; export const handler = async (event) => { // For HTTP API, body is JSON string const body = event.body ? JSON.parse(event.body) : {}; const filename = body.filename || `file-${Date.now()}.txt`; const content = body.content || "empty"; const key = `uploads/${filename}`; // 1) Save to S3 await s3.send(new PutObjectCommand({ Bucket: UPLOAD_BUCKET, Key: key, Body: content, ContentType: "text/plain" })); // 2) Publish event to Kinesis const record = { bucket: UPLOAD_BUCKET, key, uploadedAt: new Date().toISOString(), requestId: event.requestContext?.requestId, userSub: event.requestContext?.authorizer?.jwt?.claims?.sub // HTTP API JWT authorizer }; await kinesis.send(new PutRecordCommand({ StreamName: STREAM_NAME, PartitionKey: crypto.randomUUID(), Data: Buffer.from(JSON.stringify(record)) })); return { statusCode: 200, body: JSON.stringify({ message: "Uploaded and queued for processing", s3: { bucket: UPLOAD_BUCKET, key } }) };
}; CODE_BLOCK:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadKinesis", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-2:YOUR_ACCOUNT_ID:stream/uploads-stream" }, { "Sid": "S3ReadWrite", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": [ "arn:aws:s3:::YOUR_UPLOAD_BUCKET/*", "arn:aws:s3:::YOUR_PROCESSED_BUCKET/*" ] } ]
} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadKinesis", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-2:YOUR_ACCOUNT_ID:stream/uploads-stream" }, { "Sid": "S3ReadWrite", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": [ "arn:aws:s3:::YOUR_UPLOAD_BUCKET/*", "arn:aws:s3:::YOUR_PROCESSED_BUCKET/*" ] } ]
} CODE_BLOCK:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadKinesis", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:us-east-2:YOUR_ACCOUNT_ID:stream/uploads-stream" }, { "Sid": "S3ReadWrite", "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": [ "arn:aws:s3:::YOUR_UPLOAD_BUCKET/*", "arn:aws:s3:::YOUR_PROCESSED_BUCKET/*" ] } ]
} COMMAND_BLOCK:
import { S3Client, GetObjectCommand, PutObjectCommand } from "@aws-sdk/client-s3"; const s3 = new S3Client({});
const PROCESSED_BUCKET = process.env.PROCESSED_BUCKET; const streamToString = async (readable) => { const chunks = []; for await (const chunk of readable) chunks.push(chunk); return Buffer.concat(chunks).toString("utf-8");
}; export const handler = async (event) => { for (const r of event.Records) { const payload = JSON.parse(Buffer.from(r.kinesis.data, "base64").toString("utf-8")); const { bucket, key } = payload; // Read uploaded object const obj = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key })); const content = await streamToString(obj.Body); // "Process" (simple example) const processed = content.toUpperCase(); // Write processed object const outKey = key.replace("uploads/", "processed/"); await s3.send(new PutObjectCommand({ Bucket: PROCESSED_BUCKET, Key: outKey, Body: processed, ContentType: "text/plain" })); } return { ok: true };
}; Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import { S3Client, GetObjectCommand, PutObjectCommand } from "@aws-sdk/client-s3"; const s3 = new S3Client({});
const PROCESSED_BUCKET = process.env.PROCESSED_BUCKET; const streamToString = async (readable) => { const chunks = []; for await (const chunk of readable) chunks.push(chunk); return Buffer.concat(chunks).toString("utf-8");
}; export const handler = async (event) => { for (const r of event.Records) { const payload = JSON.parse(Buffer.from(r.kinesis.data, "base64").toString("utf-8")); const { bucket, key } = payload; // Read uploaded object const obj = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key })); const content = await streamToString(obj.Body); // "Process" (simple example) const processed = content.toUpperCase(); // Write processed object const outKey = key.replace("uploads/", "processed/"); await s3.send(new PutObjectCommand({ Bucket: PROCESSED_BUCKET, Key: outKey, Body: processed, ContentType: "text/plain" })); } return { ok: true };
}; COMMAND_BLOCK:
import { S3Client, GetObjectCommand, PutObjectCommand } from "@aws-sdk/client-s3"; const s3 = new S3Client({});
const PROCESSED_BUCKET = process.env.PROCESSED_BUCKET; const streamToString = async (readable) => { const chunks = []; for await (const chunk of readable) chunks.push(chunk); return Buffer.concat(chunks).toString("utf-8");
}; export const handler = async (event) => { for (const r of event.Records) { const payload = JSON.parse(Buffer.from(r.kinesis.data, "base64").toString("utf-8")); const { bucket, key } = payload; // Read uploaded object const obj = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key })); const content = await streamToString(obj.Body); // "Process" (simple example) const processed = content.toUpperCase(); // Write processed object const outKey = key.replace("uploads/", "processed/"); await s3.send(new PutObjectCommand({ Bucket: PROCESSED_BUCKET, Key: outKey, Body: processed, ContentType: "text/plain" })); } return { ok: true };
}; COMMAND_BLOCK:
curl -i -X POST "$API" \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"filename":"demo.txt","content":"hello kinesis"}' Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
curl -i -X POST "$API" \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"filename":"demo.txt","content":"hello kinesis"}' COMMAND_BLOCK:
curl -i -X POST "$API" \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"filename":"demo.txt","content":"hello kinesis"}' - Client authenticates with Cognito → gets JWT token
- Client calls API Gateway /uploads with Authorization: Bearer
- API Gateway invokes Lambda Upload Handler
- Lambda stores file/metadata to S3 (uploads/...)
- Lambda publishes an event to Kinesis Data Stream
- Kinesis Consumer Lambda processes and writes to S3 (processed/...) (or DynamoDB, etc.) - Sign-in options: Email
- MFA: optional (you can enable later)
- App integration: Create App client (no client secret for browser/mobile; for your curl tests it’s also easier without secret)
- Create App client (no client secret for browser/mobile; for your curl tests it’s also easier without secret)
- Create pool - Create App client (no client secret for browser/mobile; for your curl tests it’s also easier without secret) - User Pool ID (example: us-east-2_XXXX)
- App Client ID - Email + temporary password
- Then you’ll set a new password on first login (or create with “permanent password” if you want). - Authorization → Create JWT authorizer Issuer URL:
https://cognito-idp.<region>.amazonaws.com/<USER_POOL_ID>
Audience:
<APP_CLIENT_ID>
- Issuer URL:
- https://cognito-idp.<region>.amazonaws.com/<USER_POOL_ID>
- <APP_CLIENT_ID>
- Attach authorizer to route POST /uploads - Issuer URL:
- https://cognito-idp.<region>.amazonaws.com/<USER_POOL_ID>
- <APP_CLIENT_ID> - Create Cognito User Pool Authorizer
- Attach to method POST /uploads and enable authorization - AuthenticationResult.IdToken (this is your JWT) - Name: uploads-stream
- Capacity mode: On-demand (simplest for lab) - UPLOAD_BUCKET = your uploads bucket name
- STREAM_NAME = uploads-stream - Read from Kinesis
- Write to processed S3 bucket (or DynamoDB) - Stream: uploads-stream
- Batch size: 10 (default OK)
- Starting position: LATEST - PROCESSED_BUCKET = your processed bucket name (or same bucket, different prefix) - Get JWT token (A4) - S3 has uploads/demo.txt
- Kinesis stream has incoming records (Monitoring metrics)
- Processor Lambda logs show it ran
- S3 has processed/demo.txt (content becomes HELLO KINESIS) - 401 Unauthorized: authorizer not attached to route/method OR issuer/audience wrong.
- Lambda can’t PutRecord: missing kinesis:PutRecord permission OR wrong stream ARN/region.
- Processor not triggering: event source mapping disabled or starting position wrong.
- S3 AccessDenied: processor role missing s3:GetObject or s3:PutObject.
how-totutorialguidedev.toainodepython