+-----------+ +-----------+ | Service A |------->| Service B | | (no | HTTP | (strict | | validation) | validation) +-----+-----+ +-----+-----+ | | | logs errors | rejects bad data v v +----------------------------------+ | AI Orchestrator | | | | 1. tail logs from all services | | 2. regex match on error pattern | | 3. build prompt with context | | 4. call LLM for a code fix | | 5. apply patch, rebuild, verify | +----------------------------------+ | v +-----------+ | MongoDB | +-----------+
+-----------+ +-----------+ | Service A |------->| Service B | | (no | HTTP | (strict | | validation) | validation) +-----+-----+ +-----+-----+ | | | logs errors | rejects bad data v v +----------------------------------+ | AI Orchestrator | | | | 1. tail logs from all services | | 2. regex match on error pattern | | 3. build prompt with context | | 4. call LLM for a code fix | | 5. apply patch, rebuild, verify | +----------------------------------+ | v +-----------+ | MongoDB | +-----------+
+-----------+ +-----------+ | Service A |------->| Service B | | (no | HTTP | (strict | | validation) | validation) +-----+-----+ +-----+-----+ | | | logs errors | rejects bad data v v +----------------------------------+ | AI Orchestrator | | | | 1. tail logs from all services | | 2. regex match on error pattern | | 3. build prompt with context | | 4. call LLM for a code fix | | 5. apply patch, rebuild, verify | +----------------------------------+ | v +-----------+ | MongoDB | +-----------+
# Stream logs from all monitored services
docker compose logs --tail 0 -f service_a service_b mongodb | while read -r line; do # Append to a rolling buffer (keeps last N lines for context) echo "$line" >> "$BUFFER_FILE" # Check if this line matches our error pattern if echo "$line" | grep -Eq "$ERROR_REGEX"; then # Hash the line to avoid retriggering on the same error signature=$(echo "$line" | sha256sum | awk '{print $1}') if should_trigger "$signature"; then echo "Detected error. Triggering AI fix." run_ai_fix "$line" "$signature" fi fi
done
# Stream logs from all monitored services
docker compose logs --tail 0 -f service_a service_b mongodb | while read -r line; do # Append to a rolling buffer (keeps last N lines for context) echo "$line" >> "$BUFFER_FILE" # Check if this line matches our error pattern if echo "$line" | grep -Eq "$ERROR_REGEX"; then # Hash the line to avoid retriggering on the same error signature=$(echo "$line" | sha256sum | awk '{print $1}') if should_trigger "$signature"; then echo "Detected error. Triggering AI fix." run_ai_fix "$line" "$signature" fi fi
done
# Stream logs from all monitored services
docker compose logs --tail 0 -f service_a service_b mongodb | while read -r line; do # Append to a rolling buffer (keeps last N lines for context) echo "$line" >> "$BUFFER_FILE" # Check if this line matches our error pattern if echo "$line" | grep -Eq "$ERROR_REGEX"; then # Hash the line to avoid retriggering on the same error signature=$(echo "$line" | sha256sum | awk '{print $1}') if should_trigger "$signature"; then echo "Detected error. Triggering AI fix." run_ai_fix "$line" "$signature" fi fi
done
You are debugging a running backend service inside Docker. Detected error pattern: transfer_remote_rejections
Matched log line: [truncated to ~1400 chars] Recent log context (last 30 lines):
You are debugging a running backend service inside Docker. Detected error pattern: transfer_remote_rejections
Matched log line: [truncated to ~1400 chars] Recent log context (last 30 lines):
You are debugging a running backend service inside Docker. Detected error pattern: transfer_remote_rejections
Matched log line: [truncated to ~1400 chars] Recent log context (last 30 lines):
Task:
- The receiving service rejects records with unexpected payload shapes.
- Fix the validation/normalization code to handle these variants: - Numbers wrapped as {"$numberInt": "42"} or {"$numberLong": "999"} - Object keys with inconsistent casing (e.g., "Category" vs "category") - Nested objects serialized as JSON strings instead of dicts - Nested objects sent as a list of {key, value} pairs
- Only modify the receiving service's code. Preserve the API contract.
- Rebuild the container, run the transfer again, verify counts.
Task:
- The receiving service rejects records with unexpected payload shapes.
- Fix the validation/normalization code to handle these variants: - Numbers wrapped as {"$numberInt": "42"} or {"$numberLong": "999"} - Object keys with inconsistent casing (e.g., "Category" vs "category") - Nested objects serialized as JSON strings instead of dicts - Nested objects sent as a list of {key, value} pairs
- Only modify the receiving service's code. Preserve the API contract.
- Rebuild the container, run the transfer again, verify counts.
Task:
- The receiving service rejects records with unexpected payload shapes.
- Fix the validation/normalization code to handle these variants: - Numbers wrapped as {"$numberInt": "42"} or {"$numberLong": "999"} - Object keys with inconsistent casing (e.g., "Category" vs "category") - Nested objects serialized as JSON strings instead of dicts - Nested objects sent as a list of {key, value} pairs
- Only modify the receiving service's code. Preserve the API contract.
- Rebuild the container, run the transfer again, verify counts.
MAX_RETRIES=3
attempt=1 while [ "$attempt" -le "$MAX_RETRIES" ]; do echo "Fix attempt $attempt/$MAX_RETRIES" # Run the LLM with a timeout timeout 900 run_llm_fix < "$PROMPT_FILE" exit_code=$? if [ "$exit_code" -eq 0 ]; then echo "Fix succeeded on attempt $attempt" break fi attempt=$((attempt + 1)) sleep 2
done
MAX_RETRIES=3
attempt=1 while [ "$attempt" -le "$MAX_RETRIES" ]; do echo "Fix attempt $attempt/$MAX_RETRIES" # Run the LLM with a timeout timeout 900 run_llm_fix < "$PROMPT_FILE" exit_code=$? if [ "$exit_code" -eq 0 ]; then echo "Fix succeeded on attempt $attempt" break fi attempt=$((attempt + 1)) sleep 2
done
MAX_RETRIES=3
attempt=1 while [ "$attempt" -le "$MAX_RETRIES" ]; do echo "Fix attempt $attempt/$MAX_RETRIES" # Run the LLM with a timeout timeout 900 run_llm_fix < "$PROMPT_FILE" exit_code=$? if [ "$exit_code" -eq 0 ]; then echo "Fix succeeded on attempt $attempt" break fi attempt=$((attempt + 1)) sleep 2
done
# docker-compose.yml (simplified)
services: ai_orchestrator: volumes: - ssh_keys:/shared-keys # writes the keypair here - /var/run/docker.sock:/var/run/docker.sock service_a: volumes: - ssh_keys:/shared-keys:ro # reads the public key service_b: volumes: - ssh_keys:/shared-keys:ro volumes: ssh_keys:
# docker-compose.yml (simplified)
services: ai_orchestrator: volumes: - ssh_keys:/shared-keys # writes the keypair here - /var/run/docker.sock:/var/run/docker.sock service_a: volumes: - ssh_keys:/shared-keys:ro # reads the public key service_b: volumes: - ssh_keys:/shared-keys:ro volumes: ssh_keys:
# docker-compose.yml (simplified)
services: ai_orchestrator: volumes: - ssh_keys:/shared-keys # writes the keypair here - /var/run/docker.sock:/var/run/docker.sock service_a: volumes: - ssh_keys:/shared-keys:ro # reads the public key service_b: volumes: - ssh_keys:/shared-keys:ro volumes: ssh_keys:
ssh service_a "tail -n 50 /var/log/app/service.log"
ssh service_b "cat /app/main.py"
ssh service_a "tail -n 50 /var/log/app/service.log"
ssh service_b "cat /app/main.py"
ssh service_a "tail -n 50 /var/log/app/service.log"
ssh service_b "cat /app/main.py"
#!/bin/sh
LOG_FILE="/var/log/app/service.log"
mkdir -p "$(dirname "$LOG_FILE")" # Run the actual command, redirect all output to the log file
"$@" >> "$LOG_FILE" 2>&1 &
MAIN_PID=$! # Tail the log file to stdout (so docker logs still works)
tail -n +1 -F "$LOG_FILE" &
TAIL_PID=$! wait "$MAIN_PID"
kill "$TAIL_PID" 2>/dev/null
#!/bin/sh
LOG_FILE="/var/log/app/service.log"
mkdir -p "$(dirname "$LOG_FILE")" # Run the actual command, redirect all output to the log file
"$@" >> "$LOG_FILE" 2>&1 &
MAIN_PID=$! # Tail the log file to stdout (so docker logs still works)
tail -n +1 -F "$LOG_FILE" &
TAIL_PID=$! wait "$MAIN_PID"
kill "$TAIL_PID" 2>/dev/null
#!/bin/sh
LOG_FILE="/var/log/app/service.log"
mkdir -p "$(dirname "$LOG_FILE")" # Run the actual command, redirect all output to the log file
"$@" >> "$LOG_FILE" 2>&1 &
MAIN_PID=$! # Tail the log file to stdout (so docker logs still works)
tail -n +1 -F "$LOG_FILE" &
TAIL_PID=$! wait "$MAIN_PID"
kill "$TAIL_PID" 2>/dev/null
{ "long_value": {"$numberLong": "900000000000000001"}, "object_values": "{\"category\": \"ALPHA\", \"quality\": \"HIGH\", \"multiplier\": 2}"
}
{ "long_value": {"$numberLong": "900000000000000001"}, "object_values": "{\"category\": \"ALPHA\", \"quality\": \"HIGH\", \"multiplier\": 2}"
}
{ "long_value": {"$numberLong": "900000000000000001"}, "object_values": "{\"category\": \"ALPHA\", \"quality\": \"HIGH\", \"multiplier\": 2}"
}
def normalize_payload(raw: dict) -> dict: """Unwrap MongoDB extended JSON and normalize shapes.""" # Handle {"$numberLong": "..."} and {"$numberInt": "..."} wrappers for field in ["long_value", "short_value", "integer_value"]: val = raw.get(field) if isinstance(val, dict): raw[field] = int(val.get("$numberLong") or val.get("$numberInt", 0)) # Handle object_values as a JSON string obj = raw.get("object_values") if isinstance(obj, str): obj = json.loads(obj) raw["object_values"] = obj # Handle object_values as [{key, value}, ...] list if isinstance(obj, list): raw["object_values"] = {item["key"]: item["value"] for item in obj} obj = raw["object_values"] # Normalize mixed-case keys if isinstance(obj, dict): normalized = {k.lower(): v for k, v in obj.items()} raw["object_values"] = normalized return raw
def normalize_payload(raw: dict) -> dict: """Unwrap MongoDB extended JSON and normalize shapes.""" # Handle {"$numberLong": "..."} and {"$numberInt": "..."} wrappers for field in ["long_value", "short_value", "integer_value"]: val = raw.get(field) if isinstance(val, dict): raw[field] = int(val.get("$numberLong") or val.get("$numberInt", 0)) # Handle object_values as a JSON string obj = raw.get("object_values") if isinstance(obj, str): obj = json.loads(obj) raw["object_values"] = obj # Handle object_values as [{key, value}, ...] list if isinstance(obj, list): raw["object_values"] = {item["key"]: item["value"] for item in obj} obj = raw["object_values"] # Normalize mixed-case keys if isinstance(obj, dict): normalized = {k.lower(): v for k, v in obj.items()} raw["object_values"] = normalized return raw
def normalize_payload(raw: dict) -> dict: """Unwrap MongoDB extended JSON and normalize shapes.""" # Handle {"$numberLong": "..."} and {"$numberInt": "..."} wrappers for field in ["long_value", "short_value", "integer_value"]: val = raw.get(field) if isinstance(val, dict): raw[field] = int(val.get("$numberLong") or val.get("$numberInt", 0)) # Handle object_values as a JSON string obj = raw.get("object_values") if isinstance(obj, str): obj = json.loads(obj) raw["object_values"] = obj # Handle object_values as [{key, value}, ...] list if isinstance(obj, list): raw["object_values"] = {item["key"]: item["value"] for item in obj} obj = raw["object_values"] # Normalize mixed-case keys if isinstance(obj, dict): normalized = {k.lower(): v for k, v in obj.items()} raw["object_values"] = normalized return raw
source_total: 1000
transferred: 996
rejected: 4
source_total: 1000
transferred: 996
rejected: 4
source_total: 1000
transferred: 996
rejected: 4
source_total: 1000
transferred: 1000
rejected: 0
source_total: 1000
transferred: 1000
rejected: 0
source_total: 1000
transferred: 1000
rejected: 0 - Service A holds raw records. It doesn't validate them — just stores whatever it gets.
- Service B is the strict one. It receives records from Service A, validates every field against business rules, and writes the good ones to a separate database. Bad records get rejected.
- The AI container sits alongside them. It has access to the Docker socket, can SSH into the other containers, and tails their logs in real time. - Tell the model exactly which file to modify. Don't let it go exploring the whole repo. In my case, the fix always lives in the receiving service's main application file.
- List the variant shapes explicitly. The model can't guess what "malformed" means in your context. Be specific about what the data looks like and what it should be normalized into.
- Include the verification step. The prompt doesn't just say "fix the code" — it says "fix the code, rebuild, re-run the transfer, check the counts." The AI needs to know when it's done. - Staging environments where you want fast iteration on integration bugs
- Demo environments that need to self-recover when data gets messy
- Data pipelines where upstream systems send unpredictable payloads and you need the receiving end to adapt
- Internal tools where the cost of an hour of downtime is higher than the risk of an automated fix - 2x FastAPI services (Python, one stores data, one validates it)
- 1x Init container (seeds test data with intentional malformed records)
- 1x AI orchestrator (tails logs, calls LLM, applies fixes)