Tools: Auto-Sync RAG Pipeline: Blob Events to Azure AI Search with Terraform ⚡

Tools: Auto-Sync RAG Pipeline: Blob Events to Azure AI Search with Terraform ⚡

🏗️ Architecture Overview

🔧 Terraform: The Full Pipeline

Event Grid and Service Bus

Function App

Function App Code

🔄 Change and Delete Detection

⚠️ Edge Cases and Gotchas

📐 Alternative: Scheduled Indexer (Simpler)

🔄 Tri-Cloud Auto-Sync Comparison

⏭️ What's Next Azure AI Search indexers can run on a schedule, but that means stale results between intervals. Here's how to build an event-driven pipeline with Event Grid, Service Bus batching, and a Function App to trigger indexer runs on demand using Terraform. You've built your Azure AI Search index, configured a blob indexer, and set it to run on a 5-minute schedule. But that means documents uploaded between intervals sit unindexed. For internal tools this might be fine. For customer-facing RAG, 5 minutes of stale results is too long. Azure AI Search indexers support scheduled runs (minimum every 5 minutes) or on-demand triggers via the REST API. This post builds an event-driven pipeline with Terraform: Blob storage events fire through Event Grid, batch in a Service Bus queue, and invoke a Function App that calls the Run Indexer API. Near-real-time sync without polling. 🎯 Why not just use the scheduled indexer? You can, and for many workloads it's the right choice. The scheduled indexer handles change detection automatically using blob timestamps. But the minimum interval is 5 minutes, and you pay for indexer execution time even when nothing changed. Event-driven sync triggers only when documents actually change, and the indexer processes immediately. Why Service Bus between Event Grid and the Function? Same batching problem as AWS. Uploading 50 documents fires 50 events. Each would trigger a separate Run Indexer call, but only one indexer run can execute at a time. Service Bus collects events and the Function processes the batch with a single indexer run. The subject_filter ensures only events from your documents container trigger the pipeline, ignoring uploads to other containers in the same storage account. Azure AI Search indexes, indexers, and data sources don't have native Terraform resources. The azurerm provider covers the search service itself, but data plane operations (creating indexes, running indexers) require REST API calls. The Function App handles this. The Consumption plan (Y1) means you pay only when the function executes. No idle cost. The 409 handler is critical. When the indexer is already running, the API returns 409 Conflict. Like the AWS ConflictException pattern, we log and skip. The running indexer already picks up the new blobs through its change detection mechanism. Azure AI Search blob indexers have built-in change detection using blob timestamps. When you trigger Run Indexer, it only processes blobs that have been modified since the last successful run. You don't need to track which files changed - the indexer handles this automatically. For deletions, use the soft delete pattern. Instead of deleting blobs directly, add a metadata property like IsDeleted = true. Configure the indexer's data source with a soft delete column detection policy: The indexer sees the metadata change, finds the matching document in the search index, and removes it. This is the recommended approach from Microsoft because hard deletes from blob storage don't propagate to the search index. Indexer execution limits. Azure AI Search indexers have a maximum execution time that varies by tier: 2 hours for Basic, 24 hours for Standard. If you're indexing large documents with AI enrichment (OCR, chunking, embedding), a single run can take significant time. Plan accordingly. One indexer run at a time. Just like Bedrock and RAG Engine, only one indexer execution can run per indexer at a time. The 409 handling in the function code addresses this. Service Bus message lock. The lock_duration = "PT2M" gives the function 2 minutes to process each message. If the function times out, the message becomes visible again and retries. After 3 failed attempts (max_delivery_count), it goes to the dead letter queue. Event Grid retry policy. Event Grid retries failed deliveries to Service Bus with exponential backoff for up to 24 hours. Between Event Grid retries and Service Bus dead lettering, you have robust failure handling without custom code. If event-driven is overkill, use the built-in indexer schedule. You configure this via the AI Search REST API when creating the indexer: This polls for changes every 5 minutes. No Event Grid, no Service Bus, no Function App. The trade-off is a 0-5 minute delay before new documents appear in search results. All three clouds follow the same pattern: storage event → message queue → serverless function → sync API. The implementations differ, but the architecture is identical. This is Post 4 of the Azure RAG Pipeline with Terraform series. Your search index now stays current automatically. Upload a document to Blob Storage, and within seconds Event Grid fires, Service Bus batches, and the Function App triggers your indexer. No polling, no stale results, no wasted compute. ⚡ Found this helpful? Follow for the full RAG Pipeline with Terraform series! 💬 Templates let you quickly answer FAQs or store snippets for re-use. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse

Code Block

Copy

Blob Storage (docs added/updated/deleted) ↓ Event Grid Subscription Service Bus Queue (batches events, 2-min lock) ↓ Function App Trigger Function App (calls POST /indexers/{name}/run) ↓ On failure Service Bus Dead Letter Queue → Monitor Alert Blob Storage (docs added/updated/deleted) ↓ Event Grid Subscription Service Bus Queue (batches events, 2-min lock) ↓ Function App Trigger Function App (calls POST /indexers/{name}/run) ↓ On failure Service Bus Dead Letter Queue → Monitor Alert Blob Storage (docs added/updated/deleted) ↓ Event Grid Subscription Service Bus Queue (batches events, 2-min lock) ↓ Function App Trigger Function App (calls POST /indexers/{name}/run) ↓ On failure Service Bus Dead Letter Queue → Monitor Alert # sync/event_grid.tf resource "azurerm_servicebus_namespace" "this" { name = "${var.environment}-${var.project}-sb" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name sku = "Standard" } resource "azurerm_servicebus_queue" "indexer_sync" { name = "indexer-sync" namespace_id = azurerm_servicebus_namespace.this.id lock_duration = "PT2M" max_delivery_count = 3 dead_lettering_on_message_expiration = true } # Event Grid system topic for the storage account resource "azurerm_eventgrid_system_topic" "blob_events" { name = "${var.environment}-blob-events" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name source_arm_resource_id = azurerm_storage_account.docs.id topic_type = "Microsoft.Storage.StorageAccounts" } # Route blob events to Service Bus queue resource "azurerm_eventgrid_system_topic_event_subscription" "blob_to_sb" { name = "blob-to-indexer-sync" system_topic = azurerm_eventgrid_system_topic.blob_events.name resource_group_name = azurerm_resource_group.this.name service_bus_queue_endpoint_id = azurerm_servicebus_queue.indexer_sync.id included_event_types = [ "Microsoft.Storage.BlobCreated", "Microsoft.Storage.BlobDeleted" ] subject_filter { subject_begins_with = "/blobServices/default/containers/${var.docs_container_name}/blobs/" subject_ends_with = "" } } # sync/event_grid.tf resource "azurerm_servicebus_namespace" "this" { name = "${var.environment}-${var.project}-sb" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name sku = "Standard" } resource "azurerm_servicebus_queue" "indexer_sync" { name = "indexer-sync" namespace_id = azurerm_servicebus_namespace.this.id lock_duration = "PT2M" max_delivery_count = 3 dead_lettering_on_message_expiration = true } # Event Grid system topic for the storage account resource "azurerm_eventgrid_system_topic" "blob_events" { name = "${var.environment}-blob-events" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name source_arm_resource_id = azurerm_storage_account.docs.id topic_type = "Microsoft.Storage.StorageAccounts" } # Route blob events to Service Bus queue resource "azurerm_eventgrid_system_topic_event_subscription" "blob_to_sb" { name = "blob-to-indexer-sync" system_topic = azurerm_eventgrid_system_topic.blob_events.name resource_group_name = azurerm_resource_group.this.name service_bus_queue_endpoint_id = azurerm_servicebus_queue.indexer_sync.id included_event_types = [ "Microsoft.Storage.BlobCreated", "Microsoft.Storage.BlobDeleted" ] subject_filter { subject_begins_with = "/blobServices/default/containers/${var.docs_container_name}/blobs/" subject_ends_with = "" } } # sync/event_grid.tf resource "azurerm_servicebus_namespace" "this" { name = "${var.environment}-${var.project}-sb" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name sku = "Standard" } resource "azurerm_servicebus_queue" "indexer_sync" { name = "indexer-sync" namespace_id = azurerm_servicebus_namespace.this.id lock_duration = "PT2M" max_delivery_count = 3 dead_lettering_on_message_expiration = true } # Event Grid system topic for the storage account resource "azurerm_eventgrid_system_topic" "blob_events" { name = "${var.environment}-blob-events" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name source_arm_resource_id = azurerm_storage_account.docs.id topic_type = "Microsoft.Storage.StorageAccounts" } # Route blob events to Service Bus queue resource "azurerm_eventgrid_system_topic_event_subscription" "blob_to_sb" { name = "blob-to-indexer-sync" system_topic = azurerm_eventgrid_system_topic.blob_events.name resource_group_name = azurerm_resource_group.this.name service_bus_queue_endpoint_id = azurerm_servicebus_queue.indexer_sync.id included_event_types = [ "Microsoft.Storage.BlobCreated", "Microsoft.Storage.BlobDeleted" ] subject_filter { subject_begins_with = "/blobServices/default/containers/${var.docs_container_name}/blobs/" subject_ends_with = "" } } # sync/function_app.tf resource "azurerm_service_plan" "sync" { name = "${var.environment}-sync-plan" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name os_type = "Linux" sku_name = "Y1" # Consumption plan } resource "azurerm_linux_function_app" "indexer_sync" { name = "${var.environment}-indexer-sync" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name service_plan_id = azurerm_service_plan.sync.id storage_account_name = azurerm_storage_account.function.name storage_account_access_key = azurerm_storage_account.function.primary_access_key site_config { application_stack { python_version = "3.11" } } app_settings = { SEARCH_SERVICE_NAME = azurerm_search_service.this.name SEARCH_ADMIN_KEY = azurerm_search_service.this.primary_key INDEXER_NAME = var.indexer_name SERVICEBUS_CONNECTION = azurerm_servicebus_namespace.this.default_primary_connection_string } } # sync/function_app.tf resource "azurerm_service_plan" "sync" { name = "${var.environment}-sync-plan" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name os_type = "Linux" sku_name = "Y1" # Consumption plan } resource "azurerm_linux_function_app" "indexer_sync" { name = "${var.environment}-indexer-sync" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name service_plan_id = azurerm_service_plan.sync.id storage_account_name = azurerm_storage_account.function.name storage_account_access_key = azurerm_storage_account.function.primary_access_key site_config { application_stack { python_version = "3.11" } } app_settings = { SEARCH_SERVICE_NAME = azurerm_search_service.this.name SEARCH_ADMIN_KEY = azurerm_search_service.this.primary_key INDEXER_NAME = var.indexer_name SERVICEBUS_CONNECTION = azurerm_servicebus_namespace.this.default_primary_connection_string } } # sync/function_app.tf resource "azurerm_service_plan" "sync" { name = "${var.environment}-sync-plan" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name os_type = "Linux" sku_name = "Y1" # Consumption plan } resource "azurerm_linux_function_app" "indexer_sync" { name = "${var.environment}-indexer-sync" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name service_plan_id = azurerm_service_plan.sync.id storage_account_name = azurerm_storage_account.function.name storage_account_access_key = azurerm_storage_account.function.primary_access_key site_config { application_stack { python_version = "3.11" } } app_settings = { SEARCH_SERVICE_NAME = azurerm_search_service.this.name SEARCH_ADMIN_KEY = azurerm_search_service.this.primary_key INDEXER_NAME = var.indexer_name SERVICEBUS_CONNECTION = azurerm_servicebus_namespace.this.default_primary_connection_string } } # sync/function_code/function_app.py import azure.functions as func import requests import os import logging app = func.FunctionApp() @app.service_bus_queue_trigger( arg_name="msg", queue_name="indexer-sync", connection="SERVICEBUS_CONNECTION" ) def run_indexer(msg: func.ServiceBusMessage): """Triggered by Service Bus messages from blob events.""" service_name = os.environ["SEARCH_SERVICE_NAME"] admin_key = os.environ["SEARCH_ADMIN_KEY"] indexer_name = os.environ["INDEXER_NAME"] event_data = msg.get_body().decode("utf-8") logging.info(f"Blob event received: {event_data[:200]}") # Call the Run Indexer REST API url = ( f"https://{service_name}.search.windows.net" f"/indexers/{indexer_name}/run" f"?api-version=2024-07-01" ) headers = { "api-key": admin_key, "Content-Type": "application/json" } response = requests.post(url, headers=headers) if response.status_code == 202: logging.info(f"Indexer run triggered: {indexer_name}") elif response.status_code == 409: # Indexer already running - safe to skip logging.info("Indexer already running, skipping") else: logging.error( f"Failed to trigger indexer: {response.status_code} " f"{response.text}" ) raise Exception(f"Indexer run failed: {response.status_code}") # sync/function_code/function_app.py import azure.functions as func import requests import os import logging app = func.FunctionApp() @app.service_bus_queue_trigger( arg_name="msg", queue_name="indexer-sync", connection="SERVICEBUS_CONNECTION" ) def run_indexer(msg: func.ServiceBusMessage): """Triggered by Service Bus messages from blob events.""" service_name = os.environ["SEARCH_SERVICE_NAME"] admin_key = os.environ["SEARCH_ADMIN_KEY"] indexer_name = os.environ["INDEXER_NAME"] event_data = msg.get_body().decode("utf-8") logging.info(f"Blob event received: {event_data[:200]}") # Call the Run Indexer REST API url = ( f"https://{service_name}.search.windows.net" f"/indexers/{indexer_name}/run" f"?api-version=2024-07-01" ) headers = { "api-key": admin_key, "Content-Type": "application/json" } response = requests.post(url, headers=headers) if response.status_code == 202: logging.info(f"Indexer run triggered: {indexer_name}") elif response.status_code == 409: # Indexer already running - safe to skip logging.info("Indexer already running, skipping") else: logging.error( f"Failed to trigger indexer: {response.status_code} " f"{response.text}" ) raise Exception(f"Indexer run failed: {response.status_code}") # sync/function_code/function_app.py import azure.functions as func import requests import os import logging app = func.FunctionApp() @app.service_bus_queue_trigger( arg_name="msg", queue_name="indexer-sync", connection="SERVICEBUS_CONNECTION" ) def run_indexer(msg: func.ServiceBusMessage): """Triggered by Service Bus messages from blob events.""" service_name = os.environ["SEARCH_SERVICE_NAME"] admin_key = os.environ["SEARCH_ADMIN_KEY"] indexer_name = os.environ["INDEXER_NAME"] event_data = msg.get_body().decode("utf-8") logging.info(f"Blob event received: {event_data[:200]}") # Call the Run Indexer REST API url = ( f"https://{service_name}.search.windows.net" f"/indexers/{indexer_name}/run" f"?api-version=2024-07-01" ) headers = { "api-key": admin_key, "Content-Type": "application/json" } response = requests.post(url, headers=headers) if response.status_code == 202: logging.info(f"Indexer run triggered: {indexer_name}") elif response.status_code == 409: # Indexer already running - safe to skip logging.info("Indexer already running, skipping") else: logging.error( f"Failed to trigger indexer: {response.status_code} " f"{response.text}" ) raise Exception(f"Indexer run failed: {response.status_code}") { "dataDeletionDetectionPolicy": { "@odata.type": "#Microsoft.Azure.Search.SoftDeleteColumnDeletionDetectionPolicy", "softDeleteColumnName": "IsDeleted", "softDeleteMarkerValue": "true" } } { "dataDeletionDetectionPolicy": { "@odata.type": "#Microsoft.Azure.Search.SoftDeleteColumnDeletionDetectionPolicy", "softDeleteColumnName": "IsDeleted", "softDeleteMarkerValue": "true" } } { "dataDeletionDetectionPolicy": { "@odata.type": "#Microsoft.Azure.Search.SoftDeleteColumnDeletionDetectionPolicy", "softDeleteColumnName": "IsDeleted", "softDeleteMarkerValue": "true" } } { "name": "my-blob-indexer", "dataSourceName": "blob-datasource", "targetIndexName": "rag-index", "schedule": { "interval": "PT5M" } } { "name": "my-blob-indexer", "dataSourceName": "blob-datasource", "targetIndexName": "rag-index", "schedule": { "interval": "PT5M" } } { "name": "my-blob-indexer", "dataSourceName": "blob-datasource", "targetIndexName": "rag-index", "schedule": { "interval": "PT5M" } } - Post 1: Azure AI Search RAG - Basic Setup 🔍 - Post 2: Advanced RAG - Three-Layer Retrieval 🧠 - Post 3: Cosmos DB Vector Search - NoSQL-Native RAG 💰 - Post 4: Auto-Sync Pipeline (you are here) ⚡