Tools
Tools: 3 Production Cases: How I Built AI Systems That Actually Make Money
2026-03-04
0 views
admin
CASE 1: DUBAI YACHT MARKETPLACE ($58K GMV IN 6 WEEKS) ## Key Engineering Challenge: Zero Double-Bookings ## Stripe Connect Escrow: 7 Financial Edge Cases ## Tech Stack ## Results (First 6 Weeks Production) ## CASE 2: MOSCOW CLINIC AI (21% → 8.1% NO-SHOW RATE) ## 152-FZ Compliance: Encrypting Personal Data Without Killing Performance ## Circuit Breaker for 1C:Medicine Integration ## 3-Tier AI Triage Pipeline ## Tech Stack ## Results (First 5 Weeks Production) ## CASE 3: AI DATING PLATFORM (95% REQUESTS INSTANT) ## The Pre-Router: Solving Callback Conflicts in aiogram ## Hybrid AI: 95% Instant, 5% LLM ## Tech Stack ## Technical Metrics ## WHAT'S NEXT? No theory. No "in this article we'll learn". Just 3 shipped projects, real code snippets, and metrics that matter. I'm a Full-Stack Python Developer who ships production systems. Over the last 6 months, I delivered 3 projects across healthcare, luxury tourism, and social tech — all under NDA, all with measurable business results. Here's what I built, how I built it, and the code patterns that made it work. The problem: Dubai's yacht rental market ran on WhatsApp and cash. No transparency, no price guarantees, no protection for either side. The solution: A two-sided marketplace with verified owners, Stripe escrow, Arabic RTL, and iOS+Android+Web from a single Turborepo monorepo. Race conditions are brutal in booking systems. Two users clicking "Reserve" on the same time slot simultaneously — who wins? Result: Zero double-bookings in production across 312 completed trips. Marketplace payments are messy. No-shows, weather cancellations, disputes, partial refunds — each requires different logic. Key insight: Don't trust webhooks alone. Always double-check via API before state transitions. The problem: Private clinic losing 21% of appointments to no-shows. Admins spending 5–6 hours/day on manual scheduling. The solution: Telegram bot + PWA with AI symptom triage, 152-FZ compliance, and 1C:Medicine integration. Russian law requires encryption of personal data. But encrypting every field makes search impossible. Result: Search latency p95 = 47ms (acceptable), full 152-FZ compliance, zero data leaks. External APIs fail. Your system shouldn't. Result: Graceful degradation — when MIS fails, bot offers phone booking instead of crashing. Result: 89.7% correct specialist routing, data stays in Russia (GigaChat API). The problem: Scientific matchmaking needs AI for conversation, but LLM latency kills UX. The solution: 3-tier hybrid architecture with local LLM, RAG, and TTS for voice messages. When you have 9+ routers handling callbacks, conflicts happen. Result: Clean separation of concerns, zero callback conflicts, easy to add new features. Result: 95% of user messages get instant responses. The 5% that need LLM get thoughtful answers — without burning API credits. I'm currently available for new projects — 40+ hours/week, remote, UAE/EU/US timezones. Minimum project size: $3,000 USD
Optimal range: $5,000–15,000 USD per project If you're a founder, CTO, or business owner looking to ship a production-grade system in 6–8 weeks — let's talk. 📧 [email protected]
🌐 grekcreator.com
💬 Telegram: @greknamed All projects delivered under NDA. Case study details verified and available on request. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse COMMAND_BLOCK:
# services/booking_service.py
import redis
from contextlib import contextmanager
from datetime import datetime class BookingService: def __init__(self, redis_client: redis.Redis): self.redis = redis_client @contextmanager def slot_lock(self, yacht_id: int, slot_start: datetime): """Distributed lock via Redis SET NX — prevents double-booking""" lock_key = f"booking:lock:{yacht_id}:{slot_start.timestamp()}" acquired = self.redis.set(lock_key, "1", nx=True, ex=900) # 15 min TTL if not acquired: raise BookingConflictError("Slot already reserved") try: yield finally: # Only delete if we still hold the lock if self.redis.get(lock_key) == "1": self.redis.delete(lock_key) async def create_booking( self, user_id: int, yacht_id: int, slot: datetime ) -> Booking: with self.slot_lock(yacht_id, slot): # Double-check availability in DB (defense in depth) if not await self._is_slot_available(yacht_id, slot): raise BookingConflictError("Slot taken during lock acquisition") async with db.transaction(): booking = await Booking.create( user_id=user_id, yacht_id=yacht_id, slot_start=slot, status="PENDING" ) # Reserve slot in availability cache await self.redis.setex( f"availability:{yacht_id}:{slot.timestamp()}", 900, "reserved" ) return booking Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# services/booking_service.py
import redis
from contextlib import contextmanager
from datetime import datetime class BookingService: def __init__(self, redis_client: redis.Redis): self.redis = redis_client @contextmanager def slot_lock(self, yacht_id: int, slot_start: datetime): """Distributed lock via Redis SET NX — prevents double-booking""" lock_key = f"booking:lock:{yacht_id}:{slot_start.timestamp()}" acquired = self.redis.set(lock_key, "1", nx=True, ex=900) # 15 min TTL if not acquired: raise BookingConflictError("Slot already reserved") try: yield finally: # Only delete if we still hold the lock if self.redis.get(lock_key) == "1": self.redis.delete(lock_key) async def create_booking( self, user_id: int, yacht_id: int, slot: datetime ) -> Booking: with self.slot_lock(yacht_id, slot): # Double-check availability in DB (defense in depth) if not await self._is_slot_available(yacht_id, slot): raise BookingConflictError("Slot taken during lock acquisition") async with db.transaction(): booking = await Booking.create( user_id=user_id, yacht_id=yacht_id, slot_start=slot, status="PENDING" ) # Reserve slot in availability cache await self.redis.setex( f"availability:{yacht_id}:{slot.timestamp()}", 900, "reserved" ) return booking COMMAND_BLOCK:
# services/booking_service.py
import redis
from contextlib import contextmanager
from datetime import datetime class BookingService: def __init__(self, redis_client: redis.Redis): self.redis = redis_client @contextmanager def slot_lock(self, yacht_id: int, slot_start: datetime): """Distributed lock via Redis SET NX — prevents double-booking""" lock_key = f"booking:lock:{yacht_id}:{slot_start.timestamp()}" acquired = self.redis.set(lock_key, "1", nx=True, ex=900) # 15 min TTL if not acquired: raise BookingConflictError("Slot already reserved") try: yield finally: # Only delete if we still hold the lock if self.redis.get(lock_key) == "1": self.redis.delete(lock_key) async def create_booking( self, user_id: int, yacht_id: int, slot: datetime ) -> Booking: with self.slot_lock(yacht_id, slot): # Double-check availability in DB (defense in depth) if not await self._is_slot_available(yacht_id, slot): raise BookingConflictError("Slot taken during lock acquisition") async with db.transaction(): booking = await Booking.create( user_id=user_id, yacht_id=yacht_id, slot_start=slot, status="PENDING" ) # Reserve slot in availability cache await self.redis.setex( f"availability:{yacht_id}:{slot.timestamp()}", 900, "reserved" ) return booking COMMAND_BLOCK:
# services/payment_service.py
import stripe async def handle_booking_completion(self, booking_id: int): """Capture payment 24h after trip ends — escrow logic""" booking = await Booking.get(booking_id) # Only capture if no dispute opened if booking.dispute_status != "OPEN": await stripe.PaymentIntent.capture( booking.stripe_payment_intent_id, amount_to_capture=booking.amount_captured ) # Split payout: 88% owner, 12% platform await stripe.Transfer.create( amount=int(booking.amount_captured * 0.88), currency="aed", destination=booking.owner_stripe_account_id, source_transaction=booking.stripe_payment_intent_id ) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# services/payment_service.py
import stripe async def handle_booking_completion(self, booking_id: int): """Capture payment 24h after trip ends — escrow logic""" booking = await Booking.get(booking_id) # Only capture if no dispute opened if booking.dispute_status != "OPEN": await stripe.PaymentIntent.capture( booking.stripe_payment_intent_id, amount_to_capture=booking.amount_captured ) # Split payout: 88% owner, 12% platform await stripe.Transfer.create( amount=int(booking.amount_captured * 0.88), currency="aed", destination=booking.owner_stripe_account_id, source_transaction=booking.stripe_payment_intent_id ) COMMAND_BLOCK:
# services/payment_service.py
import stripe async def handle_booking_completion(self, booking_id: int): """Capture payment 24h after trip ends — escrow logic""" booking = await Booking.get(booking_id) # Only capture if no dispute opened if booking.dispute_status != "OPEN": await stripe.PaymentIntent.capture( booking.stripe_payment_intent_id, amount_to_capture=booking.amount_captured ) # Split payout: 88% owner, 12% platform await stripe.Transfer.create( amount=int(booking.amount_captured * 0.88), currency="aed", destination=booking.owner_stripe_account_id, source_transaction=booking.stripe_payment_intent_id ) COMMAND_BLOCK:
# models/patient.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.hybrid import hybrid_property
import hashlib class Patient(Base): __tablename__ = "patients" id = Column(Integer, primary_key=True) # Encrypted fields (pgcrypto AES-256) _full_name = Column("full_name", String(255)) _phone = Column("phone", String(20)) # Hash fields for search (SHA-256, indexed) phone_hash = Column(String(64), index=True) @hybrid_property def phone(self) -> str: """Decrypt on read — only when needed""" if not self._phone: return None return pg_decrypt(self._phone, get_encryption_key()) @phone.setter def phone(self, value: str): """Encrypt on write + update hash for search""" self._phone = pg_encrypt(value, get_encryption_key()) self.phone_hash = hashlib.sha256(value.encode()).hexdigest() @classmethod async def find_by_phone(cls, phone: str): """Search via hash — no decryption needed""" phone_hash = hashlib.sha256(phone.encode()).hexdigest() return await cls.query.filter(cls.phone_hash == phone_hash).first() Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# models/patient.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.hybrid import hybrid_property
import hashlib class Patient(Base): __tablename__ = "patients" id = Column(Integer, primary_key=True) # Encrypted fields (pgcrypto AES-256) _full_name = Column("full_name", String(255)) _phone = Column("phone", String(20)) # Hash fields for search (SHA-256, indexed) phone_hash = Column(String(64), index=True) @hybrid_property def phone(self) -> str: """Decrypt on read — only when needed""" if not self._phone: return None return pg_decrypt(self._phone, get_encryption_key()) @phone.setter def phone(self, value: str): """Encrypt on write + update hash for search""" self._phone = pg_encrypt(value, get_encryption_key()) self.phone_hash = hashlib.sha256(value.encode()).hexdigest() @classmethod async def find_by_phone(cls, phone: str): """Search via hash — no decryption needed""" phone_hash = hashlib.sha256(phone.encode()).hexdigest() return await cls.query.filter(cls.phone_hash == phone_hash).first() COMMAND_BLOCK:
# models/patient.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.hybrid import hybrid_property
import hashlib class Patient(Base): __tablename__ = "patients" id = Column(Integer, primary_key=True) # Encrypted fields (pgcrypto AES-256) _full_name = Column("full_name", String(255)) _phone = Column("phone", String(20)) # Hash fields for search (SHA-256, indexed) phone_hash = Column(String(64), index=True) @hybrid_property def phone(self) -> str: """Decrypt on read — only when needed""" if not self._phone: return None return pg_decrypt(self._phone, get_encryption_key()) @phone.setter def phone(self, value: str): """Encrypt on write + update hash for search""" self._phone = pg_encrypt(value, get_encryption_key()) self.phone_hash = hashlib.sha256(value.encode()).hexdigest() @classmethod async def find_by_phone(cls, phone: str): """Search via hash — no decryption needed""" phone_hash = hashlib.sha256(phone.encode()).hexdigest() return await cls.query.filter(cls.phone_hash == phone_hash).first() COMMAND_BLOCK:
# services/mis_client.py
from enum import Enum
import time class CircuitState(Enum): CLOSED = "closed" # Normal operation OPEN = "open" # Failing — reject requests HALF_OPEN = "half_open" # Testing recovery class MISClient: def __init__(self, base_url: str): self.base_url = base_url self.state = CircuitState.CLOSED self.failure_count = 0 self.last_failure_time = None async def call(self, endpoint: str, **kwargs): if self.state == CircuitState.OPEN: # Check if recovery window passed if time.time() - self.last_failure_time > 300: self.state = CircuitState.HALF_OPEN else: raise MISUnavailableError("Try phone booking") try: result = await self._make_request(endpoint, **kwargs) self._on_success() return result except Exception as e: self._on_failure() raise def _on_success(self): self.failure_count = 0 self.state = CircuitState.CLOSED def _on_failure(self): self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= 3: self.state = CircuitState.OPEN Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# services/mis_client.py
from enum import Enum
import time class CircuitState(Enum): CLOSED = "closed" # Normal operation OPEN = "open" # Failing — reject requests HALF_OPEN = "half_open" # Testing recovery class MISClient: def __init__(self, base_url: str): self.base_url = base_url self.state = CircuitState.CLOSED self.failure_count = 0 self.last_failure_time = None async def call(self, endpoint: str, **kwargs): if self.state == CircuitState.OPEN: # Check if recovery window passed if time.time() - self.last_failure_time > 300: self.state = CircuitState.HALF_OPEN else: raise MISUnavailableError("Try phone booking") try: result = await self._make_request(endpoint, **kwargs) self._on_success() return result except Exception as e: self._on_failure() raise def _on_success(self): self.failure_count = 0 self.state = CircuitState.CLOSED def _on_failure(self): self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= 3: self.state = CircuitState.OPEN COMMAND_BLOCK:
# services/mis_client.py
from enum import Enum
import time class CircuitState(Enum): CLOSED = "closed" # Normal operation OPEN = "open" # Failing — reject requests HALF_OPEN = "half_open" # Testing recovery class MISClient: def __init__(self, base_url: str): self.base_url = base_url self.state = CircuitState.CLOSED self.failure_count = 0 self.last_failure_time = None async def call(self, endpoint: str, **kwargs): if self.state == CircuitState.OPEN: # Check if recovery window passed if time.time() - self.last_failure_time > 300: self.state = CircuitState.HALF_OPEN else: raise MISUnavailableError("Try phone booking") try: result = await self._make_request(endpoint, **kwargs) self._on_success() return result except Exception as e: self._on_failure() raise def _on_success(self): self.failure_count = 0 self.state = CircuitState.CLOSED def _on_failure(self): self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= 3: self.state = CircuitState.OPEN COMMAND_BLOCK:
# services/triage_service.py
class TriageService: async def triage(self, symptoms: str) -> str: # L1: Keyword Router (0ms, 78% of requests) if route := self.keyword_router.match(symptoms): return route # L2: RAG + Cache (~100ms, 17% of requests) if response := await self.rag_service.query(symptoms): return response # L3: GigaChat API (2–6s, 5% of requests) return await self.llm.generate(symptoms) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# services/triage_service.py
class TriageService: async def triage(self, symptoms: str) -> str: # L1: Keyword Router (0ms, 78% of requests) if route := self.keyword_router.match(symptoms): return route # L2: RAG + Cache (~100ms, 17% of requests) if response := await self.rag_service.query(symptoms): return response # L3: GigaChat API (2–6s, 5% of requests) return await self.llm.generate(symptoms) COMMAND_BLOCK:
# services/triage_service.py
class TriageService: async def triage(self, symptoms: str) -> str: # L1: Keyword Router (0ms, 78% of requests) if route := self.keyword_router.match(symptoms): return route # L2: RAG + Cache (~100ms, 17% of requests) if response := await self.rag_service.query(symptoms): return response # L3: GigaChat API (2–6s, 5% of requests) return await self.llm.generate(symptoms) COMMAND_BLOCK:
# routers/pre_router.py
from aiogram import Router
from aiogram.types import CallbackQuery class PreRouter(Router): """Routes callbacks before they hit specific routers — prevents conflicts""" def __init__(self): super().__init__() # Pattern-based routing: callback data starts with prefix self.patterns = { "somatype_": self._route_somatype, "pc_": self._route_pc, "meeting_": self._route_meeting, "payment_": self._route_payment, } async def process_callback(self, callback: CallbackQuery): data = callback.data for prefix, handler in self.patterns.items(): if data.startswith(prefix): return await handler(callback) # Fallback: let other routers try return await super().process_callback(callback) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# routers/pre_router.py
from aiogram import Router
from aiogram.types import CallbackQuery class PreRouter(Router): """Routes callbacks before they hit specific routers — prevents conflicts""" def __init__(self): super().__init__() # Pattern-based routing: callback data starts with prefix self.patterns = { "somatype_": self._route_somatype, "pc_": self._route_pc, "meeting_": self._route_meeting, "payment_": self._route_payment, } async def process_callback(self, callback: CallbackQuery): data = callback.data for prefix, handler in self.patterns.items(): if data.startswith(prefix): return await handler(callback) # Fallback: let other routers try return await super().process_callback(callback) COMMAND_BLOCK:
# routers/pre_router.py
from aiogram import Router
from aiogram.types import CallbackQuery class PreRouter(Router): """Routes callbacks before they hit specific routers — prevents conflicts""" def __init__(self): super().__init__() # Pattern-based routing: callback data starts with prefix self.patterns = { "somatype_": self._route_somatype, "pc_": self._route_pc, "meeting_": self._route_meeting, "payment_": self._route_payment, } async def process_callback(self, callback: CallbackQuery): data = callback.data for prefix, handler in self.patterns.items(): if data.startswith(prefix): return await handler(callback) # Fallback: let other routers try return await super().process_callback(callback) COMMAND_BLOCK:
# services/hybrid_ai.py
import asyncio
import hashlib
from llama_cpp import Llama class HybridAIAgent: def __init__(self): self.keyword_router = KeywordRouter() self.cache = ResponseCache(max_size=200) self.llm = Llama(model_path="Qwen2.5-3B-Instruct-Q4_K_M") async def respond(self, user_id: int, text: str) -> str: # Tier 1: Keyword Router (0ms) if response := await self.keyword_router.route(text): return response # Tier 2: Cache (0.001s) cache_key = self._normalize(text) if cached := await self.cache.get(cache_key): return cached # Tier 3: LLM + RAG (6–19s, but only 5% of requests) rag_context = await self.rag_service.get_relevant_chunks(text) prompt = self._build_prompt(text, rag_context) response = await asyncio.wait_for( asyncio.to_thread(self.llm.generate, prompt), timeout=15.0 ) # Cache successful responses await self.cache.set(cache_key, response, ttl=3600) return response def _normalize(self, text: str) -> str: """MD5-normalize for cache: identical questions = same key""" normalized = re.sub(r"[^\w\s]", "", text.lower().strip()) return hashlib.md5(normalized.encode()).hexdigest() Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
# services/hybrid_ai.py
import asyncio
import hashlib
from llama_cpp import Llama class HybridAIAgent: def __init__(self): self.keyword_router = KeywordRouter() self.cache = ResponseCache(max_size=200) self.llm = Llama(model_path="Qwen2.5-3B-Instruct-Q4_K_M") async def respond(self, user_id: int, text: str) -> str: # Tier 1: Keyword Router (0ms) if response := await self.keyword_router.route(text): return response # Tier 2: Cache (0.001s) cache_key = self._normalize(text) if cached := await self.cache.get(cache_key): return cached # Tier 3: LLM + RAG (6–19s, but only 5% of requests) rag_context = await self.rag_service.get_relevant_chunks(text) prompt = self._build_prompt(text, rag_context) response = await asyncio.wait_for( asyncio.to_thread(self.llm.generate, prompt), timeout=15.0 ) # Cache successful responses await self.cache.set(cache_key, response, ttl=3600) return response def _normalize(self, text: str) -> str: """MD5-normalize for cache: identical questions = same key""" normalized = re.sub(r"[^\w\s]", "", text.lower().strip()) return hashlib.md5(normalized.encode()).hexdigest() COMMAND_BLOCK:
# services/hybrid_ai.py
import asyncio
import hashlib
from llama_cpp import Llama class HybridAIAgent: def __init__(self): self.keyword_router = KeywordRouter() self.cache = ResponseCache(max_size=200) self.llm = Llama(model_path="Qwen2.5-3B-Instruct-Q4_K_M") async def respond(self, user_id: int, text: str) -> str: # Tier 1: Keyword Router (0ms) if response := await self.keyword_router.route(text): return response # Tier 2: Cache (0.001s) cache_key = self._normalize(text) if cached := await self.cache.get(cache_key): return cached # Tier 3: LLM + RAG (6–19s, but only 5% of requests) rag_context = await self.rag_service.get_relevant_chunks(text) prompt = self._build_prompt(text, rag_context) response = await asyncio.wait_for( asyncio.to_thread(self.llm.generate, prompt), timeout=15.0 ) # Cache successful responses await self.cache.set(cache_key, response, ttl=3600) return response def _normalize(self, text: str) -> str: """MD5-normalize for cache: identical questions = same key""" normalized = re.sub(r"[^\w\s]", "", text.lower().strip()) return hashlib.md5(normalized.encode()).hexdigest() - Backend: Python 3.12, FastAPI, Celery 5 + Redis, SQLAlchemy 2.0 async
- Frontend: React 18 + TypeScript + Zustand + Tailwind + Mapbox GL JS
- Mobile: React Native 0.73 + Expo SDK 50 (Turborepo monorepo)
- Infra: AWS ECS Fargate, RDS Multi-AZ, ElastiCache Redis, Terraform
- Payments: Stripe Connect Marketplace (escrow, split payments, disputes)
- KYC: Sumsub API (UAE-specific: DCCA license, Emirates ID)
- AI: GPT-4o-mini Route Advisor (EN/AR), ML Price Recommendation - Bot: Python 3.12, aiogram 3.4 async, APScheduler 3.10 (12 jobs)
- Backend: FastAPI + SQLAlchemy 2.0 async, pgcrypto AES-256
- AI: Keyword Router (94 patterns) → ChromaDB RAG + rubert-tiny2 → GigaChat API
- PWA: React 18 + TypeScript + Vite, Service Worker offline cache, Web Push API
- Integration: 1C:Medicine REST API + circuit breaker, 1C:Buhgalteriya export
- Infra: Selectel VPS (Saint Petersburg, RF), LUKS full disk encryption - Bot: Python 3.11, aiogram 3 async, FSM state machines
- Web: FastAPI + Jinja2 + vanilla JS 15KB (somatype test)
- AI: 3-tier Hybrid (L1: 154 patterns → L2: LRU Cache MD5 → L3: Qwen2.5-3B + ChromaDB RAG)
- TTS: Long AI responses synthesized as voice messages
- DB: PostgreSQL 16 native, RPC find_match() — full matching algorithm in database
- Payments: Alfa-Bank REST API (HMAC webhook verification, idempotency)
- Infra: Beget VPS, systemd services, Nginx SSL, 24 security measures - AI-powered automation systems (chatbots, triage, routing)
- Cross-platform marketplaces (iOS + Android + Web from monorepo)
- Enterprise backend with compliance (152-FZ, GDPR, healthcare, fintech)
how-totutorialguidedev.toaimlllmgptsystemdroutingrouterpostgresqlnginxpythonssl