Tools: Building Multi-Stage LLM Pipelines with Job Dependencies

Tools: Building Multi-Stage LLM Pipelines with Job Dependencies

Source: Dev.to

The Pattern ## The Code ## Key Points Modern AI apps don't call a single LLM. They chain multiple steps: embed a query, search a vector DB, generate a response. Each step depends on the previous one. https://github.com/egeominotti/flashq https://flashq.dev/ Works great for RAG, document processing, content generation, or any multi-step AI workflow. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse CODE_BLOCK: ┌─────────┐ ┌─────────┐ ┌──────────┐ │ Embed │ ──▶ │ Search │ ──▶ │ Generate │ └─────────┘ └─────────┘ └──────────┘ Enter fullscreen mode Exit fullscreen mode CODE_BLOCK: ┌─────────┐ ┌─────────┐ ┌──────────┐ │ Embed │ ──▶ │ Search │ ──▶ │ Generate │ └─────────┘ └─────────┘ └──────────┘ CODE_BLOCK: ┌─────────┐ ┌─────────┐ ┌──────────┐ │ Embed │ ──▶ │ Search │ ──▶ │ Generate │ └─────────┘ └─────────┘ └──────────┘ COMMAND_BLOCK: import { Queue, Worker } from 'flashq'; import OpenAI from 'openai'; const openai = new OpenAI(); const queue = new Queue('ai-pipeline'); // Define the pipeline async function ask(question: string) { const embed = await queue.add('embed', { text: question }); const search = await queue.add('search', { topK: 5 }, { dependsOn: [embed.id] }); const generate = await queue.add('generate', {}, { dependsOn: [search.id] }); return queue.finished(generate.id); } // Worker handles all stages new Worker('ai-pipeline', async (job) => { switch (job.name) { case 'embed': const embed = await openai.embeddings.create({ input: job.data.text, model: 'text-embedding-3-small' }); return { embedding: embed.data[0].embedding, text: job.data.text }; case 'search': const results = await vectorDB.search({ vector: job.parentResult.embedding, topK: job.data.topK }); return { documents: results, query: job.parentResult.text }; case 'generate': const context = job.parentResult.documents.map(d => d.content).join('\n\n'); const response = await openai.chat.completions.create({ model: 'gpt-4', messages: [ { role: 'system', content: 'Answer based on context.' }, { role: 'user', content: `Context:\n${context}\n\nQuestion: ${job.parentResult.query}` } ] }); return response.choices[0].message.content; } }); // Usage const answer = await ask('What is the capital of France?'); Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: import { Queue, Worker } from 'flashq'; import OpenAI from 'openai'; const openai = new OpenAI(); const queue = new Queue('ai-pipeline'); // Define the pipeline async function ask(question: string) { const embed = await queue.add('embed', { text: question }); const search = await queue.add('search', { topK: 5 }, { dependsOn: [embed.id] }); const generate = await queue.add('generate', {}, { dependsOn: [search.id] }); return queue.finished(generate.id); } // Worker handles all stages new Worker('ai-pipeline', async (job) => { switch (job.name) { case 'embed': const embed = await openai.embeddings.create({ input: job.data.text, model: 'text-embedding-3-small' }); return { embedding: embed.data[0].embedding, text: job.data.text }; case 'search': const results = await vectorDB.search({ vector: job.parentResult.embedding, topK: job.data.topK }); return { documents: results, query: job.parentResult.text }; case 'generate': const context = job.parentResult.documents.map(d => d.content).join('\n\n'); const response = await openai.chat.completions.create({ model: 'gpt-4', messages: [ { role: 'system', content: 'Answer based on context.' }, { role: 'user', content: `Context:\n${context}\n\nQuestion: ${job.parentResult.query}` } ] }); return response.choices[0].message.content; } }); // Usage const answer = await ask('What is the capital of France?'); COMMAND_BLOCK: import { Queue, Worker } from 'flashq'; import OpenAI from 'openai'; const openai = new OpenAI(); const queue = new Queue('ai-pipeline'); // Define the pipeline async function ask(question: string) { const embed = await queue.add('embed', { text: question }); const search = await queue.add('search', { topK: 5 }, { dependsOn: [embed.id] }); const generate = await queue.add('generate', {}, { dependsOn: [search.id] }); return queue.finished(generate.id); } // Worker handles all stages new Worker('ai-pipeline', async (job) => { switch (job.name) { case 'embed': const embed = await openai.embeddings.create({ input: job.data.text, model: 'text-embedding-3-small' }); return { embedding: embed.data[0].embedding, text: job.data.text }; case 'search': const results = await vectorDB.search({ vector: job.parentResult.embedding, topK: job.data.topK }); return { documents: results, query: job.parentResult.text }; case 'generate': const context = job.parentResult.documents.map(d => d.content).join('\n\n'); const response = await openai.chat.completions.create({ model: 'gpt-4', messages: [ { role: 'system', content: 'Answer based on context.' }, { role: 'user', content: `Context:\n${context}\n\nQuestion: ${job.parentResult.query}` } ] }); return response.choices[0].message.content; } }); // Usage const answer = await ask('What is the capital of France?'); - dependsOn: [jobId] chains jobs in sequence - job.parentResult accesses the previous job's output - queue.finished(id) waits for the entire pipeline - Failed jobs stop the pipeline — configure attempts for retries