Blob Storage (docs added/updated/deleted) ↓ Event Grid Subscription
Service Bus Queue (batches events, 2-min lock) ↓ Function App Trigger
Function App (calls POST /indexers/{name}/run) ↓ On failure
Service Bus Dead Letter Queue → Monitor Alert
Blob Storage (docs added/updated/deleted) ↓ Event Grid Subscription
Service Bus Queue (batches events, 2-min lock) ↓ Function App Trigger
Function App (calls POST /indexers/{name}/run) ↓ On failure
Service Bus Dead Letter Queue → Monitor Alert
Blob Storage (docs added/updated/deleted) ↓ Event Grid Subscription
Service Bus Queue (batches events, 2-min lock) ↓ Function App Trigger
Function App (calls POST /indexers/{name}/run) ↓ On failure
Service Bus Dead Letter Queue → Monitor Alert
# sync/event_grid.tf resource "azurerm_servicebus_namespace" "this" { name = "${var.environment}-${var.project}-sb" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name sku = "Standard"
} resource "azurerm_servicebus_queue" "indexer_sync" { name = "indexer-sync" namespace_id = azurerm_servicebus_namespace.this.id lock_duration = "PT2M" max_delivery_count = 3 dead_lettering_on_message_expiration = true
} # Event Grid system topic for the storage account
resource "azurerm_eventgrid_system_topic" "blob_events" { name = "${var.environment}-blob-events" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name source_arm_resource_id = azurerm_storage_account.docs.id topic_type = "Microsoft.Storage.StorageAccounts"
} # Route blob events to Service Bus queue
resource "azurerm_eventgrid_system_topic_event_subscription" "blob_to_sb" { name = "blob-to-indexer-sync" system_topic = azurerm_eventgrid_system_topic.blob_events.name resource_group_name = azurerm_resource_group.this.name service_bus_queue_endpoint_id = azurerm_servicebus_queue.indexer_sync.id included_event_types = [ "Microsoft.Storage.BlobCreated", "Microsoft.Storage.BlobDeleted" ] subject_filter { subject_begins_with = "/blobServices/default/containers/${var.docs_container_name}/blobs/" subject_ends_with = "" }
}
# sync/event_grid.tf resource "azurerm_servicebus_namespace" "this" { name = "${var.environment}-${var.project}-sb" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name sku = "Standard"
} resource "azurerm_servicebus_queue" "indexer_sync" { name = "indexer-sync" namespace_id = azurerm_servicebus_namespace.this.id lock_duration = "PT2M" max_delivery_count = 3 dead_lettering_on_message_expiration = true
} # Event Grid system topic for the storage account
resource "azurerm_eventgrid_system_topic" "blob_events" { name = "${var.environment}-blob-events" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name source_arm_resource_id = azurerm_storage_account.docs.id topic_type = "Microsoft.Storage.StorageAccounts"
} # Route blob events to Service Bus queue
resource "azurerm_eventgrid_system_topic_event_subscription" "blob_to_sb" { name = "blob-to-indexer-sync" system_topic = azurerm_eventgrid_system_topic.blob_events.name resource_group_name = azurerm_resource_group.this.name service_bus_queue_endpoint_id = azurerm_servicebus_queue.indexer_sync.id included_event_types = [ "Microsoft.Storage.BlobCreated", "Microsoft.Storage.BlobDeleted" ] subject_filter { subject_begins_with = "/blobServices/default/containers/${var.docs_container_name}/blobs/" subject_ends_with = "" }
}
# sync/event_grid.tf resource "azurerm_servicebus_namespace" "this" { name = "${var.environment}-${var.project}-sb" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name sku = "Standard"
} resource "azurerm_servicebus_queue" "indexer_sync" { name = "indexer-sync" namespace_id = azurerm_servicebus_namespace.this.id lock_duration = "PT2M" max_delivery_count = 3 dead_lettering_on_message_expiration = true
} # Event Grid system topic for the storage account
resource "azurerm_eventgrid_system_topic" "blob_events" { name = "${var.environment}-blob-events" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name source_arm_resource_id = azurerm_storage_account.docs.id topic_type = "Microsoft.Storage.StorageAccounts"
} # Route blob events to Service Bus queue
resource "azurerm_eventgrid_system_topic_event_subscription" "blob_to_sb" { name = "blob-to-indexer-sync" system_topic = azurerm_eventgrid_system_topic.blob_events.name resource_group_name = azurerm_resource_group.this.name service_bus_queue_endpoint_id = azurerm_servicebus_queue.indexer_sync.id included_event_types = [ "Microsoft.Storage.BlobCreated", "Microsoft.Storage.BlobDeleted" ] subject_filter { subject_begins_with = "/blobServices/default/containers/${var.docs_container_name}/blobs/" subject_ends_with = "" }
}
# sync/function_app.tf resource "azurerm_service_plan" "sync" { name = "${var.environment}-sync-plan" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name os_type = "Linux" sku_name = "Y1" # Consumption plan
} resource "azurerm_linux_function_app" "indexer_sync" { name = "${var.environment}-indexer-sync" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name service_plan_id = azurerm_service_plan.sync.id storage_account_name = azurerm_storage_account.function.name storage_account_access_key = azurerm_storage_account.function.primary_access_key site_config { application_stack { python_version = "3.11" } } app_settings = { SEARCH_SERVICE_NAME = azurerm_search_service.this.name SEARCH_ADMIN_KEY = azurerm_search_service.this.primary_key INDEXER_NAME = var.indexer_name SERVICEBUS_CONNECTION = azurerm_servicebus_namespace.this.default_primary_connection_string }
}
# sync/function_app.tf resource "azurerm_service_plan" "sync" { name = "${var.environment}-sync-plan" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name os_type = "Linux" sku_name = "Y1" # Consumption plan
} resource "azurerm_linux_function_app" "indexer_sync" { name = "${var.environment}-indexer-sync" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name service_plan_id = azurerm_service_plan.sync.id storage_account_name = azurerm_storage_account.function.name storage_account_access_key = azurerm_storage_account.function.primary_access_key site_config { application_stack { python_version = "3.11" } } app_settings = { SEARCH_SERVICE_NAME = azurerm_search_service.this.name SEARCH_ADMIN_KEY = azurerm_search_service.this.primary_key INDEXER_NAME = var.indexer_name SERVICEBUS_CONNECTION = azurerm_servicebus_namespace.this.default_primary_connection_string }
}
# sync/function_app.tf resource "azurerm_service_plan" "sync" { name = "${var.environment}-sync-plan" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name os_type = "Linux" sku_name = "Y1" # Consumption plan
} resource "azurerm_linux_function_app" "indexer_sync" { name = "${var.environment}-indexer-sync" location = azurerm_resource_group.this.location resource_group_name = azurerm_resource_group.this.name service_plan_id = azurerm_service_plan.sync.id storage_account_name = azurerm_storage_account.function.name storage_account_access_key = azurerm_storage_account.function.primary_access_key site_config { application_stack { python_version = "3.11" } } app_settings = { SEARCH_SERVICE_NAME = azurerm_search_service.this.name SEARCH_ADMIN_KEY = azurerm_search_service.this.primary_key INDEXER_NAME = var.indexer_name SERVICEBUS_CONNECTION = azurerm_servicebus_namespace.this.default_primary_connection_string }
}
# sync/function_code/function_app.py import azure.functions as func
import requests
import os
import logging app = func.FunctionApp() @app.service_bus_queue_trigger( arg_name="msg", queue_name="indexer-sync", connection="SERVICEBUS_CONNECTION"
)
def run_indexer(msg: func.ServiceBusMessage): """Triggered by Service Bus messages from blob events.""" service_name = os.environ["SEARCH_SERVICE_NAME"] admin_key = os.environ["SEARCH_ADMIN_KEY"] indexer_name = os.environ["INDEXER_NAME"] event_data = msg.get_body().decode("utf-8") logging.info(f"Blob event received: {event_data[:200]}") # Call the Run Indexer REST API url = ( f"https://{service_name}.search.windows.net" f"/indexers/{indexer_name}/run" f"?api-version=2024-07-01" ) headers = { "api-key": admin_key, "Content-Type": "application/json" } response = requests.post(url, headers=headers) if response.status_code == 202: logging.info(f"Indexer run triggered: {indexer_name}") elif response.status_code == 409: # Indexer already running - safe to skip logging.info("Indexer already running, skipping") else: logging.error( f"Failed to trigger indexer: {response.status_code} " f"{response.text}" ) raise Exception(f"Indexer run failed: {response.status_code}")
# sync/function_code/function_app.py import azure.functions as func
import requests
import os
import logging app = func.FunctionApp() @app.service_bus_queue_trigger( arg_name="msg", queue_name="indexer-sync", connection="SERVICEBUS_CONNECTION"
)
def run_indexer(msg: func.ServiceBusMessage): """Triggered by Service Bus messages from blob events.""" service_name = os.environ["SEARCH_SERVICE_NAME"] admin_key = os.environ["SEARCH_ADMIN_KEY"] indexer_name = os.environ["INDEXER_NAME"] event_data = msg.get_body().decode("utf-8") logging.info(f"Blob event received: {event_data[:200]}") # Call the Run Indexer REST API url = ( f"https://{service_name}.search.windows.net" f"/indexers/{indexer_name}/run" f"?api-version=2024-07-01" ) headers = { "api-key": admin_key, "Content-Type": "application/json" } response = requests.post(url, headers=headers) if response.status_code == 202: logging.info(f"Indexer run triggered: {indexer_name}") elif response.status_code == 409: # Indexer already running - safe to skip logging.info("Indexer already running, skipping") else: logging.error( f"Failed to trigger indexer: {response.status_code} " f"{response.text}" ) raise Exception(f"Indexer run failed: {response.status_code}")
# sync/function_code/function_app.py import azure.functions as func
import requests
import os
import logging app = func.FunctionApp() @app.service_bus_queue_trigger( arg_name="msg", queue_name="indexer-sync", connection="SERVICEBUS_CONNECTION"
)
def run_indexer(msg: func.ServiceBusMessage): """Triggered by Service Bus messages from blob events.""" service_name = os.environ["SEARCH_SERVICE_NAME"] admin_key = os.environ["SEARCH_ADMIN_KEY"] indexer_name = os.environ["INDEXER_NAME"] event_data = msg.get_body().decode("utf-8") logging.info(f"Blob event received: {event_data[:200]}") # Call the Run Indexer REST API url = ( f"https://{service_name}.search.windows.net" f"/indexers/{indexer_name}/run" f"?api-version=2024-07-01" ) headers = { "api-key": admin_key, "Content-Type": "application/json" } response = requests.post(url, headers=headers) if response.status_code == 202: logging.info(f"Indexer run triggered: {indexer_name}") elif response.status_code == 409: # Indexer already running - safe to skip logging.info("Indexer already running, skipping") else: logging.error( f"Failed to trigger indexer: {response.status_code} " f"{response.text}" ) raise Exception(f"Indexer run failed: {response.status_code}")
{ "dataDeletionDetectionPolicy": { "@odata.type": "#Microsoft.Azure.Search.SoftDeleteColumnDeletionDetectionPolicy", "softDeleteColumnName": "IsDeleted", "softDeleteMarkerValue": "true" }
}
{ "dataDeletionDetectionPolicy": { "@odata.type": "#Microsoft.Azure.Search.SoftDeleteColumnDeletionDetectionPolicy", "softDeleteColumnName": "IsDeleted", "softDeleteMarkerValue": "true" }
}
{ "dataDeletionDetectionPolicy": { "@odata.type": "#Microsoft.Azure.Search.SoftDeleteColumnDeletionDetectionPolicy", "softDeleteColumnName": "IsDeleted", "softDeleteMarkerValue": "true" }
}
{ "name": "my-blob-indexer", "dataSourceName": "blob-datasource", "targetIndexName": "rag-index", "schedule": { "interval": "PT5M" }
}
{ "name": "my-blob-indexer", "dataSourceName": "blob-datasource", "targetIndexName": "rag-index", "schedule": { "interval": "PT5M" }
}
{ "name": "my-blob-indexer", "dataSourceName": "blob-datasource", "targetIndexName": "rag-index", "schedule": { "interval": "PT5M" }
} - Post 1: Azure AI Search RAG - Basic Setup 🔍
- Post 2: Advanced RAG - Three-Layer Retrieval 🧠
- Post 3: Cosmos DB Vector Search - NoSQL-Native RAG 💰
- Post 4: Auto-Sync Pipeline (you are here) ⚡