Tools
Tools: From 10-Minute Blocking APIs to Async Pipelines: A Production Backend Redesign
2026-02-21
0 views
admin
What the Original Flow Looked Like ## Rethinking the Request Lifecycle ## Building a Staged Async Pipeline ## Handling Large CSV Files ## Optimizing External API Calls with a Master–Worker Pattern ## Optimizing the Read Path ## Measured Impact ## Upload API ## End-to-End Processing ## Read API ## CPU Utilization ## Engineering Takeaways When I took over a transaction ingestion system running in production, one of the first things I noticed was the time it took to process CSV uploads. Users regularly uploaded files averaging around 50,000 rows, with some reaching over 600,000. The upload API handled everything synchronously: parsing the file, inserting rows into Cassandra, fetching historical prices from an external service per transaction, performing reconciliation, calculating balances and tax metrics, and only then returning a response. In practice, this meant requests could take anywhere from five to ten minutes to complete. Frontend and Nginx timeouts had been extended to accommodate this behavior. The system technically worked, but only by allowing long blocking requests. It became clear that this wasn’t just a performance issue. The architecture itself was tightly coupled to the request lifecycle. Heavy compute and IO operations were happening directly inside the API path. Synchronous designs often work well at small scale. But as data size increases, the bottlenecks become harder to ignore. At a high level, the upload endpoint was responsible for doing everything in a single request lifecycle. Once a user uploaded a CSV file, the API would: Conceptually, it looked like this: Every upload request had to wait for: There was also no automated retry mechanism. If processing failed midway, it required manual intervention. At the same time, another API responsible for returning transaction data to the frontend fetched raw records from Cassandra and performed calculations inside the request path. That endpoint routinely took 30–40 seconds to complete. The ingestion flow looked like this: Each stage blocked the next. The system accumulated CPU-bound work (parsing, calculations) and IO-bound work (database writes, external API calls) inside a single HTTP request. The first architectural decision was simple: Move heavy work out of the request path. Instead of completing ingestion synchronously, the upload API was redesigned to become status-based: The API now initiated a pipeline rather than executing it. Processing was decomposed into multiple independent consumers: Each stage became independently scalable and observable. Files were processed in dynamic chunks (typically 500–2000 rows depending on structure). Chunk-level parallelism was introduced using threads, configurable via environment variables (defaulted to 8). This reduced memory spikes and improved CPU utilization during parsing and metric computation. Originally, historical pricing APIs were called sequentially per transaction. This was redesigned using a master–worker model: This allowed controlled concurrency, better rate-limit handling, and parallel processing instead of sequential IO. The read API was slow because it was calculating metrics at request time. Instead of using a Materialized View, a new Cassandra table was introduced: A new consumer transformed processed records into this optimized table. The read API now simply applied filters and limits on precomputed data. Before: 5–10 minutes
After: ~0.5 seconds Before: 5–20 minutes
After: 2–4 minutes Before: 30–40 seconds
After: ~0.5–1 second Before: ~5–8%
After: ~70–80% The improvement was not just about latency reduction. It was about reshaping how work flowed through the system. This redesign changed how I approach ingestion systems.
When processing grows, the architecture must evolve with it. If you’ve worked on ingestion pipelines or faced similar architectural bottlenecks, I’d be interested to hear how you approached it. Happy to discuss trade-offs or alternative designs. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse CODE_BLOCK:
@app.post("/upload")
def upload_csv(file): rows = parse_csv(file) for row in rows: insert_into_cassandra(row) price = fetch_historical_price(row) reconciled = reconcile_transaction(row, price) processed = calculate_metrics(reconciled) update_transaction(processed) return {"status": "completed"} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
@app.post("/upload")
def upload_csv(file): rows = parse_csv(file) for row in rows: insert_into_cassandra(row) price = fetch_historical_price(row) reconciled = reconcile_transaction(row, price) processed = calculate_metrics(reconciled) update_transaction(processed) return {"status": "completed"} CODE_BLOCK:
@app.post("/upload")
def upload_csv(file): rows = parse_csv(file) for row in rows: insert_into_cassandra(row) price = fetch_historical_price(row) reconciled = reconcile_transaction(row, price) processed = calculate_metrics(reconciled) update_transaction(processed) return {"status": "completed"} CODE_BLOCK:
User │ ▼
Upload API │ ▼
Parse CSV │ ▼
Insert into Cassandra │ ▼
Fetch Historical Price (External API) │ ▼
Reconciliation │ ▼
Tax / Metrics Calculation │ ▼
HTTP Response Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
User │ ▼
Upload API │ ▼
Parse CSV │ ▼
Insert into Cassandra │ ▼
Fetch Historical Price (External API) │ ▼
Reconciliation │ ▼
Tax / Metrics Calculation │ ▼
HTTP Response CODE_BLOCK:
User │ ▼
Upload API │ ▼
Parse CSV │ ▼
Insert into Cassandra │ ▼
Fetch Historical Price (External API) │ ▼
Reconciliation │ ▼
Tax / Metrics Calculation │ ▼
HTTP Response CODE_BLOCK:
@app.post("/upload")
def upload_csv(file): job_id = create_job_record(status="queued") queue.publish({"job_id": job_id}) return {"job_id": job_id, "status": "processing"} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
@app.post("/upload")
def upload_csv(file): job_id = create_job_record(status="queued") queue.publish({"job_id": job_id}) return {"job_id": job_id, "status": "processing"} CODE_BLOCK:
@app.post("/upload")
def upload_csv(file): job_id = create_job_record(status="queued") queue.publish({"job_id": job_id}) return {"job_id": job_id, "status": "processing"} CODE_BLOCK:
Upload API │ ▼
Queue A → Insert Consumer │ ▼
Queue B → Historical Price Workers │ ▼
Queue C → Reconciliation Consumer │ ▼
Queue D → Tax / Metrics Consumer │ ▼
Read-Optimized Table Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
Upload API │ ▼
Queue A → Insert Consumer │ ▼
Queue B → Historical Price Workers │ ▼
Queue C → Reconciliation Consumer │ ▼
Queue D → Tax / Metrics Consumer │ ▼
Read-Optimized Table CODE_BLOCK:
Upload API │ ▼
Queue A → Insert Consumer │ ▼
Queue B → Historical Price Workers │ ▼
Queue C → Reconciliation Consumer │ ▼
Queue D → Tax / Metrics Consumer │ ▼
Read-Optimized Table CODE_BLOCK:
async def master(batch_groups): tasks = [worker.process(batch) for batch in batch_groups] results = await asyncio.gather(*tasks) return results Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
async def master(batch_groups): tasks = [worker.process(batch) for batch in batch_groups] results = await asyncio.gather(*tasks) return results CODE_BLOCK:
async def master(batch_groups): tasks = [worker.process(batch) for batch in batch_groups] results = await asyncio.gather(*tasks) return results - Parse the file row by row
- Insert each transaction into Cassandra
- Fetch historical pricing data for every transaction
- Perform reconciliation
- Calculate balances and tax metrics
- Return a response only after all processing was complete - Large file parsing
- Tens of thousands of database writes
- External API calls per transaction
- Reconciliation and tax calculations - The master grouped transactions into batches
- Workers processed batches in parallel
- Workers wrote results directly to Cassandra
- The master coordinated using asyncio.gather - Contained only frontend-required fields
- Included computed metrics
- Excluded ingestion-only data - Heavy work does not belong in the request lifecycle.
- Separate ingestion from presentation.
- Parallelism should be intentional and controlled.
- External rate limits are architectural constraints.
- Pipelines scale better than monolithic request handlers.
how-totutorialguidedev.toainginxdatabase