Tools
Tools: Building Multi-Stage LLM Pipelines with Job Dependencies
2026-01-22
0 views
admin
The Pattern ## The Code ## Key Points Modern AI apps don't call a single LLM. They chain multiple steps: embed a query, search a vector DB, generate a response. Each step depends on the previous one. https://github.com/egeominotti/flashq https://flashq.dev/ Works great for RAG, document processing, content generation, or any multi-step AI workflow. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to ? It will become hidden in your post, but will still be visible via the comment's permalink. as well , this person and/or CODE_BLOCK: ┌─────────┐ ┌─────────┐ ┌──────────┐ │ Embed │ ──▶ │ Search │ ──▶ │ Generate │ └─────────┘ └─────────┘ └──────────┘ CODE_BLOCK: ┌─────────┐ ┌─────────┐ ┌──────────┐ │ Embed │ ──▶ │ Search │ ──▶ │ Generate │ └─────────┘ └─────────┘ └──────────┘ CODE_BLOCK: ┌─────────┐ ┌─────────┐ ┌──────────┐ │ Embed │ ──▶ │ Search │ ──▶ │ Generate │ └─────────┘ └─────────┘ └──────────┘ COMMAND_BLOCK: import { Queue, Worker } from 'flashq'; import OpenAI from 'openai'; const openai = new OpenAI(); const queue = new Queue('ai-pipeline'); // Define the pipeline async function ask(question: string) { const embed = await queue.add('embed', { text: question }); const search = await queue.add('search', { topK: 5 }, { dependsOn: [embed.id] }); const generate = await queue.add('generate', {}, { dependsOn: [search.id] }); return queue.finished(generate.id); } // Worker handles all stages new Worker('ai-pipeline', async (job) => { switch (job.name) { case 'embed': const embed = await openai.embeddings.create({ input: job.data.text, model: 'text-embedding-3-small' }); return { embedding: embed.data[0].embedding, text: job.data.text }; case 'search': const results = await vectorDB.search({ vector: job.parentResult.embedding, topK: job.data.topK }); return { documents: results, query: job.parentResult.text }; case 'generate': const context = job.parentResult.documents.map(d => d.content).join('\n\n'); const response = await openai.chat.completions.create({ model: 'gpt-4', messages: [ { role: 'system', content: 'Answer based on context.' }, { role: 'user', content: `Context:\n${context}\n\nQuestion: ${job.parentResult.query}` } ] }); return response.choices[0].message.content; } }); // Usage const answer = await ask('What is the capital of France?'); COMMAND_BLOCK: import { Queue, Worker } from 'flashq'; import OpenAI from 'openai'; const openai = new OpenAI(); const queue = new Queue('ai-pipeline'); // Define the pipeline async function ask(question: string) { const embed = await queue.add('embed', { text: question }); const search = await queue.add('search', { topK: 5 }, { dependsOn: [embed.id] }); const generate = await queue.add('generate', {}, { dependsOn: [search.id] }); return queue.finished(generate.id); } // Worker handles all stages new Worker('ai-pipeline', async (job) => { switch (job.name) { case 'embed': const embed = await openai.embeddings.create({ input: job.data.text, model: 'text-embedding-3-small' }); return { embedding: embed.data[0].embedding, text: job.data.text }; case 'search': const results = await vectorDB.search({ vector: job.parentResult.embedding, topK: job.data.topK }); return { documents: results, query: job.parentResult.text }; case 'generate': const context = job.parentResult.documents.map(d => d.content).join('\n\n'); const response = await openai.chat.completions.create({ model: 'gpt-4', messages: [ { role: 'system', content: 'Answer based on context.' }, { role: 'user', content: `Context:\n${context}\n\nQuestion: ${job.parentResult.query}` } ] }); return response.choices[0].message.content; } }); // Usage const answer = await ask('What is the capital of France?'); COMMAND_BLOCK: import { Queue, Worker } from 'flashq'; import OpenAI from 'openai'; const openai = new OpenAI(); const queue = new Queue('ai-pipeline'); // Define the pipeline async function ask(question: string) { const embed = await queue.add('embed', { text: question }); const search = await queue.add('search', { topK: 5 }, { dependsOn: [embed.id] }); const generate = await queue.add('generate', {}, { dependsOn: [search.id] }); return queue.finished(generate.id); } // Worker handles all stages new Worker('ai-pipeline', async (job) => { switch (job.name) { case 'embed': const embed = await openai.embeddings.create({ input: job.data.text, model: 'text-embedding-3-small' }); return { embedding: embed.data[0].embedding, text: job.data.text }; case 'search': const results = await vectorDB.search({ vector: job.parentResult.embedding, topK: job.data.topK }); return { documents: results, query: job.parentResult.text }; case 'generate': const context = job.parentResult.documents.map(d => d.content).join('\n\n'); const response = await openai.chat.completions.create({ model: 'gpt-4', messages: [ { role: 'system', content: 'Answer based on context.' }, { role: 'user', content: `Context:\n${context}\n\nQuestion: ${job.parentResult.query}` } ] }); return response.choices[0].message.content; } }); // Usage const answer = await ask('What is the capital of France?'); - dependsOn: [jobId] chains jobs in sequence - job.parentResult accesses the previous job's output - queue.finished(id) waits for the entire pipeline - Failed jobs stop the pipeline — configure attempts for retries
toolsutilitiessecurity toolsbuildingmultistagepipelinesdependencies