Tools: 3 Production Cases: How I Built AI Systems That Actually Make Money

Tools: 3 Production Cases: How I Built AI Systems That Actually Make Money

CASE 1: DUBAI YACHT MARKETPLACE ($58K GMV IN 6 WEEKS) ## Key Engineering Challenge: Zero Double-Bookings ## Stripe Connect Escrow: 7 Financial Edge Cases ## Tech Stack ## Results (First 6 Weeks Production) ## CASE 2: MOSCOW CLINIC AI (21% → 8.1% NO-SHOW RATE) ## 152-FZ Compliance: Encrypting Personal Data Without Killing Performance ## Circuit Breaker for 1C:Medicine Integration ## 3-Tier AI Triage Pipeline ## Tech Stack ## Results (First 5 Weeks Production) ## CASE 3: AI DATING PLATFORM (95% REQUESTS INSTANT) ## The Pre-Router: Solving Callback Conflicts in aiogram ## Hybrid AI: 95% Instant, 5% LLM ## Tech Stack ## Technical Metrics ## WHAT'S NEXT? No theory. No "in this article we'll learn". Just 3 shipped projects, real code snippets, and metrics that matter. I'm a Full-Stack Python Developer who ships production systems. Over the last 6 months, I delivered 3 projects across healthcare, luxury tourism, and social tech — all under NDA, all with measurable business results. Here's what I built, how I built it, and the code patterns that made it work. The problem: Dubai's yacht rental market ran on WhatsApp and cash. No transparency, no price guarantees, no protection for either side. The solution: A two-sided marketplace with verified owners, Stripe escrow, Arabic RTL, and iOS+Android+Web from a single Turborepo monorepo. Race conditions are brutal in booking systems. Two users clicking "Reserve" on the same time slot simultaneously — who wins? Result: Zero double-bookings in production across 312 completed trips. Marketplace payments are messy. No-shows, weather cancellations, disputes, partial refunds — each requires different logic. Key insight: Don't trust webhooks alone. Always double-check via API before state transitions. The problem: Private clinic losing 21% of appointments to no-shows. Admins spending 5–6 hours/day on manual scheduling. The solution: Telegram bot + PWA with AI symptom triage, 152-FZ compliance, and 1C:Medicine integration. Russian law requires encryption of personal data. But encrypting every field makes search impossible. Result: Search latency p95 = 47ms (acceptable), full 152-FZ compliance, zero data leaks. External APIs fail. Your system shouldn't. Result: Graceful degradation — when MIS fails, bot offers phone booking instead of crashing. Result: 89.7% correct specialist routing, data stays in Russia (GigaChat API). The problem: Scientific matchmaking needs AI for conversation, but LLM latency kills UX. The solution: 3-tier hybrid architecture with local LLM, RAG, and TTS for voice messages. When you have 9+ routers handling callbacks, conflicts happen. Result: Clean separation of concerns, zero callback conflicts, easy to add new features. Result: 95% of user messages get instant responses. The 5% that need LLM get thoughtful answers — without burning API credits. I'm currently available for new projects — 40+ hours/week, remote, UAE/EU/US timezones. Minimum project size: $3,000 USD Optimal range: $5,000–15,000 USD per project If you're a founder, CTO, or business owner looking to ship a production-grade system in 6–8 weeks — let's talk. 📧 [email protected] 🌐 grekcreator.com 💬 Telegram: @greknamed All projects delivered under NDA. Case study details verified and available on request. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to ? It will become hidden in your post, but will still be visible via the comment's permalink. as well , this person and/or COMMAND_BLOCK: # services/booking_service.py import redis from contextlib import contextmanager from datetime import datetime class BookingService: def __init__(self, redis_client: redis.Redis): self.redis = redis_client @contextmanager def slot_lock(self, yacht_id: int, slot_start: datetime): """Distributed lock via Redis SET NX — prevents double-booking""" lock_key = f"booking:lock:{yacht_id}:{slot_start.timestamp()}" acquired = self.redis.set(lock_key, "1", nx=True, ex=900) # 15 min TTL if not acquired: raise BookingConflictError("Slot already reserved") try: yield finally: # Only delete if we still hold the lock if self.redis.get(lock_key) == "1": self.redis.delete(lock_key) async def create_booking( self, user_id: int, yacht_id: int, slot: datetime ) -> Booking: with self.slot_lock(yacht_id, slot): # Double-check availability in DB (defense in depth) if not await self._is_slot_available(yacht_id, slot): raise BookingConflictError("Slot taken during lock acquisition") async with db.transaction(): booking = await Booking.create( user_id=user_id, yacht_id=yacht_id, slot_start=slot, status="PENDING" ) # Reserve slot in availability cache await self.redis.setex( f"availability:{yacht_id}:{slot.timestamp()}", 900, "reserved" ) return booking COMMAND_BLOCK: # services/booking_service.py import redis from contextlib import contextmanager from datetime import datetime class BookingService: def __init__(self, redis_client: redis.Redis): self.redis = redis_client @contextmanager def slot_lock(self, yacht_id: int, slot_start: datetime): """Distributed lock via Redis SET NX — prevents double-booking""" lock_key = f"booking:lock:{yacht_id}:{slot_start.timestamp()}" acquired = self.redis.set(lock_key, "1", nx=True, ex=900) # 15 min TTL if not acquired: raise BookingConflictError("Slot already reserved") try: yield finally: # Only delete if we still hold the lock if self.redis.get(lock_key) == "1": self.redis.delete(lock_key) async def create_booking( self, user_id: int, yacht_id: int, slot: datetime ) -> Booking: with self.slot_lock(yacht_id, slot): # Double-check availability in DB (defense in depth) if not await self._is_slot_available(yacht_id, slot): raise BookingConflictError("Slot taken during lock acquisition") async with db.transaction(): booking = await Booking.create( user_id=user_id, yacht_id=yacht_id, slot_start=slot, status="PENDING" ) # Reserve slot in availability cache await self.redis.setex( f"availability:{yacht_id}:{slot.timestamp()}", 900, "reserved" ) return booking COMMAND_BLOCK: # services/booking_service.py import redis from contextlib import contextmanager from datetime import datetime class BookingService: def __init__(self, redis_client: redis.Redis): self.redis = redis_client @contextmanager def slot_lock(self, yacht_id: int, slot_start: datetime): """Distributed lock via Redis SET NX — prevents double-booking""" lock_key = f"booking:lock:{yacht_id}:{slot_start.timestamp()}" acquired = self.redis.set(lock_key, "1", nx=True, ex=900) # 15 min TTL if not acquired: raise BookingConflictError("Slot already reserved") try: yield finally: # Only delete if we still hold the lock if self.redis.get(lock_key) == "1": self.redis.delete(lock_key) async def create_booking( self, user_id: int, yacht_id: int, slot: datetime ) -> Booking: with self.slot_lock(yacht_id, slot): # Double-check availability in DB (defense in depth) if not await self._is_slot_available(yacht_id, slot): raise BookingConflictError("Slot taken during lock acquisition") async with db.transaction(): booking = await Booking.create( user_id=user_id, yacht_id=yacht_id, slot_start=slot, status="PENDING" ) # Reserve slot in availability cache await self.redis.setex( f"availability:{yacht_id}:{slot.timestamp()}", 900, "reserved" ) return booking COMMAND_BLOCK: # services/payment_service.py import stripe async def handle_booking_completion(self, booking_id: int): """Capture payment 24h after trip ends — escrow logic""" booking = await Booking.get(booking_id) # Only capture if no dispute opened if booking.dispute_status != "OPEN": await stripe.PaymentIntent.capture( booking.stripe_payment_intent_id, amount_to_capture=booking.amount_captured ) # Split payout: 88% owner, 12% platform await stripe.Transfer.create( amount=int(booking.amount_captured * 0.88), currency="aed", destination=booking.owner_stripe_account_id, source_transaction=booking.stripe_payment_intent_id ) COMMAND_BLOCK: # services/payment_service.py import stripe async def handle_booking_completion(self, booking_id: int): """Capture payment 24h after trip ends — escrow logic""" booking = await Booking.get(booking_id) # Only capture if no dispute opened if booking.dispute_status != "OPEN": await stripe.PaymentIntent.capture( booking.stripe_payment_intent_id, amount_to_capture=booking.amount_captured ) # Split payout: 88% owner, 12% platform await stripe.Transfer.create( amount=int(booking.amount_captured * 0.88), currency="aed", destination=booking.owner_stripe_account_id, source_transaction=booking.stripe_payment_intent_id ) COMMAND_BLOCK: # services/payment_service.py import stripe async def handle_booking_completion(self, booking_id: int): """Capture payment 24h after trip ends — escrow logic""" booking = await Booking.get(booking_id) # Only capture if no dispute opened if booking.dispute_status != "OPEN": await stripe.PaymentIntent.capture( booking.stripe_payment_intent_id, amount_to_capture=booking.amount_captured ) # Split payout: 88% owner, 12% platform await stripe.Transfer.create( amount=int(booking.amount_captured * 0.88), currency="aed", destination=booking.owner_stripe_account_id, source_transaction=booking.stripe_payment_intent_id ) COMMAND_BLOCK: # models/patient.py from sqlalchemy import Column, Integer, String from sqlalchemy.ext.hybrid import hybrid_property import hashlib class Patient(Base): __tablename__ = "patients" id = Column(Integer, primary_key=True) # Encrypted fields (pgcrypto AES-256) _full_name = Column("full_name", String(255)) _phone = Column("phone", String(20)) # Hash fields for search (SHA-256, indexed) phone_hash = Column(String(64), index=True) @hybrid_property def phone(self) -> str: """Decrypt on read — only when needed""" if not self._phone: return None return pg_decrypt(self._phone, get_encryption_key()) @phone.setter def phone(self, value: str): """Encrypt on write + update hash for search""" self._phone = pg_encrypt(value, get_encryption_key()) self.phone_hash = hashlib.sha256(value.encode()).hexdigest() @classmethod async def find_by_phone(cls, phone: str): """Search via hash — no decryption needed""" phone_hash = hashlib.sha256(phone.encode()).hexdigest() return await cls.query.filter(cls.phone_hash == phone_hash).first() COMMAND_BLOCK: # models/patient.py from sqlalchemy import Column, Integer, String from sqlalchemy.ext.hybrid import hybrid_property import hashlib class Patient(Base): __tablename__ = "patients" id = Column(Integer, primary_key=True) # Encrypted fields (pgcrypto AES-256) _full_name = Column("full_name", String(255)) _phone = Column("phone", String(20)) # Hash fields for search (SHA-256, indexed) phone_hash = Column(String(64), index=True) @hybrid_property def phone(self) -> str: """Decrypt on read — only when needed""" if not self._phone: return None return pg_decrypt(self._phone, get_encryption_key()) @phone.setter def phone(self, value: str): """Encrypt on write + update hash for search""" self._phone = pg_encrypt(value, get_encryption_key()) self.phone_hash = hashlib.sha256(value.encode()).hexdigest() @classmethod async def find_by_phone(cls, phone: str): """Search via hash — no decryption needed""" phone_hash = hashlib.sha256(phone.encode()).hexdigest() return await cls.query.filter(cls.phone_hash == phone_hash).first() COMMAND_BLOCK: # models/patient.py from sqlalchemy import Column, Integer, String from sqlalchemy.ext.hybrid import hybrid_property import hashlib class Patient(Base): __tablename__ = "patients" id = Column(Integer, primary_key=True) # Encrypted fields (pgcrypto AES-256) _full_name = Column("full_name", String(255)) _phone = Column("phone", String(20)) # Hash fields for search (SHA-256, indexed) phone_hash = Column(String(64), index=True) @hybrid_property def phone(self) -> str: """Decrypt on read — only when needed""" if not self._phone: return None return pg_decrypt(self._phone, get_encryption_key()) @phone.setter def phone(self, value: str): """Encrypt on write + update hash for search""" self._phone = pg_encrypt(value, get_encryption_key()) self.phone_hash = hashlib.sha256(value.encode()).hexdigest() @classmethod async def find_by_phone(cls, phone: str): """Search via hash — no decryption needed""" phone_hash = hashlib.sha256(phone.encode()).hexdigest() return await cls.query.filter(cls.phone_hash == phone_hash).first() COMMAND_BLOCK: # services/mis_client.py from enum import Enum import time class CircuitState(Enum): CLOSED = "closed" # Normal operation OPEN = "open" # Failing — reject requests HALF_OPEN = "half_open" # Testing recovery class MISClient: def __init__(self, base_url: str): self.base_url = base_url self.state = CircuitState.CLOSED self.failure_count = 0 self.last_failure_time = None async def call(self, endpoint: str, **kwargs): if self.state == CircuitState.OPEN: # Check if recovery window passed if time.time() - self.last_failure_time > 300: self.state = CircuitState.HALF_OPEN else: raise MISUnavailableError("Try phone booking") try: result = await self._make_request(endpoint, **kwargs) self._on_success() return result except Exception as e: self._on_failure() raise def _on_success(self): self.failure_count = 0 self.state = CircuitState.CLOSED def _on_failure(self): self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= 3: self.state = CircuitState.OPEN COMMAND_BLOCK: # services/mis_client.py from enum import Enum import time class CircuitState(Enum): CLOSED = "closed" # Normal operation OPEN = "open" # Failing — reject requests HALF_OPEN = "half_open" # Testing recovery class MISClient: def __init__(self, base_url: str): self.base_url = base_url self.state = CircuitState.CLOSED self.failure_count = 0 self.last_failure_time = None async def call(self, endpoint: str, **kwargs): if self.state == CircuitState.OPEN: # Check if recovery window passed if time.time() - self.last_failure_time > 300: self.state = CircuitState.HALF_OPEN else: raise MISUnavailableError("Try phone booking") try: result = await self._make_request(endpoint, **kwargs) self._on_success() return result except Exception as e: self._on_failure() raise def _on_success(self): self.failure_count = 0 self.state = CircuitState.CLOSED def _on_failure(self): self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= 3: self.state = CircuitState.OPEN COMMAND_BLOCK: # services/mis_client.py from enum import Enum import time class CircuitState(Enum): CLOSED = "closed" # Normal operation OPEN = "open" # Failing — reject requests HALF_OPEN = "half_open" # Testing recovery class MISClient: def __init__(self, base_url: str): self.base_url = base_url self.state = CircuitState.CLOSED self.failure_count = 0 self.last_failure_time = None async def call(self, endpoint: str, **kwargs): if self.state == CircuitState.OPEN: # Check if recovery window passed if time.time() - self.last_failure_time > 300: self.state = CircuitState.HALF_OPEN else: raise MISUnavailableError("Try phone booking") try: result = await self._make_request(endpoint, **kwargs) self._on_success() return result except Exception as e: self._on_failure() raise def _on_success(self): self.failure_count = 0 self.state = CircuitState.CLOSED def _on_failure(self): self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= 3: self.state = CircuitState.OPEN COMMAND_BLOCK: # services/triage_service.py class TriageService: async def triage(self, symptoms: str) -> str: # L1: Keyword Router (0ms, 78% of requests) if route := self.keyword_router.match(symptoms): return route # L2: RAG + Cache (~100ms, 17% of requests) if response := await self.rag_service.query(symptoms): return response # L3: GigaChat API (2–6s, 5% of requests) return await self.llm.generate(symptoms) COMMAND_BLOCK: # services/triage_service.py class TriageService: async def triage(self, symptoms: str) -> str: # L1: Keyword Router (0ms, 78% of requests) if route := self.keyword_router.match(symptoms): return route # L2: RAG + Cache (~100ms, 17% of requests) if response := await self.rag_service.query(symptoms): return response # L3: GigaChat API (2–6s, 5% of requests) return await self.llm.generate(symptoms) COMMAND_BLOCK: # services/triage_service.py class TriageService: async def triage(self, symptoms: str) -> str: # L1: Keyword Router (0ms, 78% of requests) if route := self.keyword_router.match(symptoms): return route # L2: RAG + Cache (~100ms, 17% of requests) if response := await self.rag_service.query(symptoms): return response # L3: GigaChat API (2–6s, 5% of requests) return await self.llm.generate(symptoms) COMMAND_BLOCK: # routers/pre_router.py from aiogram import Router from aiogram.types import CallbackQuery class PreRouter(Router): """Routes callbacks before they hit specific routers — prevents conflicts""" def __init__(self): super().__init__() # Pattern-based routing: callback data starts with prefix self.patterns = { "somatype_": self._route_somatype, "pc_": self._route_pc, "meeting_": self._route_meeting, "payment_": self._route_payment, } async def process_callback(self, callback: CallbackQuery): data = callback.data for prefix, handler in self.patterns.items(): if data.startswith(prefix): return await handler(callback) # Fallback: let other routers try return await super().process_callback(callback) COMMAND_BLOCK: # routers/pre_router.py from aiogram import Router from aiogram.types import CallbackQuery class PreRouter(Router): """Routes callbacks before they hit specific routers — prevents conflicts""" def __init__(self): super().__init__() # Pattern-based routing: callback data starts with prefix self.patterns = { "somatype_": self._route_somatype, "pc_": self._route_pc, "meeting_": self._route_meeting, "payment_": self._route_payment, } async def process_callback(self, callback: CallbackQuery): data = callback.data for prefix, handler in self.patterns.items(): if data.startswith(prefix): return await handler(callback) # Fallback: let other routers try return await super().process_callback(callback) COMMAND_BLOCK: # routers/pre_router.py from aiogram import Router from aiogram.types import CallbackQuery class PreRouter(Router): """Routes callbacks before they hit specific routers — prevents conflicts""" def __init__(self): super().__init__() # Pattern-based routing: callback data starts with prefix self.patterns = { "somatype_": self._route_somatype, "pc_": self._route_pc, "meeting_": self._route_meeting, "payment_": self._route_payment, } async def process_callback(self, callback: CallbackQuery): data = callback.data for prefix, handler in self.patterns.items(): if data.startswith(prefix): return await handler(callback) # Fallback: let other routers try return await super().process_callback(callback) COMMAND_BLOCK: # services/hybrid_ai.py import asyncio import hashlib from llama_cpp import Llama class HybridAIAgent: def __init__(self): self.keyword_router = KeywordRouter() self.cache = ResponseCache(max_size=200) self.llm = Llama(model_path="Qwen2.5-3B-Instruct-Q4_K_M") async def respond(self, user_id: int, text: str) -> str: # Tier 1: Keyword Router (0ms) if response := await self.keyword_router.route(text): return response # Tier 2: Cache (0.001s) cache_key = self._normalize(text) if cached := await self.cache.get(cache_key): return cached # Tier 3: LLM + RAG (6–19s, but only 5% of requests) rag_context = await self.rag_service.get_relevant_chunks(text) prompt = self._build_prompt(text, rag_context) response = await asyncio.wait_for( asyncio.to_thread(self.llm.generate, prompt), timeout=15.0 ) # Cache successful responses await self.cache.set(cache_key, response, ttl=3600) return response def _normalize(self, text: str) -> str: """MD5-normalize for cache: identical questions = same key""" normalized = re.sub(r"[^\w\s]", "", text.lower().strip()) return hashlib.md5(normalized.encode()).hexdigest() COMMAND_BLOCK: # services/hybrid_ai.py import asyncio import hashlib from llama_cpp import Llama class HybridAIAgent: def __init__(self): self.keyword_router = KeywordRouter() self.cache = ResponseCache(max_size=200) self.llm = Llama(model_path="Qwen2.5-3B-Instruct-Q4_K_M") async def respond(self, user_id: int, text: str) -> str: # Tier 1: Keyword Router (0ms) if response := await self.keyword_router.route(text): return response # Tier 2: Cache (0.001s) cache_key = self._normalize(text) if cached := await self.cache.get(cache_key): return cached # Tier 3: LLM + RAG (6–19s, but only 5% of requests) rag_context = await self.rag_service.get_relevant_chunks(text) prompt = self._build_prompt(text, rag_context) response = await asyncio.wait_for( asyncio.to_thread(self.llm.generate, prompt), timeout=15.0 ) # Cache successful responses await self.cache.set(cache_key, response, ttl=3600) return response def _normalize(self, text: str) -> str: """MD5-normalize for cache: identical questions = same key""" normalized = re.sub(r"[^\w\s]", "", text.lower().strip()) return hashlib.md5(normalized.encode()).hexdigest() COMMAND_BLOCK: # services/hybrid_ai.py import asyncio import hashlib from llama_cpp import Llama class HybridAIAgent: def __init__(self): self.keyword_router = KeywordRouter() self.cache = ResponseCache(max_size=200) self.llm = Llama(model_path="Qwen2.5-3B-Instruct-Q4_K_M") async def respond(self, user_id: int, text: str) -> str: # Tier 1: Keyword Router (0ms) if response := await self.keyword_router.route(text): return response # Tier 2: Cache (0.001s) cache_key = self._normalize(text) if cached := await self.cache.get(cache_key): return cached # Tier 3: LLM + RAG (6–19s, but only 5% of requests) rag_context = await self.rag_service.get_relevant_chunks(text) prompt = self._build_prompt(text, rag_context) response = await asyncio.wait_for( asyncio.to_thread(self.llm.generate, prompt), timeout=15.0 ) # Cache successful responses await self.cache.set(cache_key, response, ttl=3600) return response def _normalize(self, text: str) -> str: """MD5-normalize for cache: identical questions = same key""" normalized = re.sub(r"[^\w\s]", "", text.lower().strip()) return hashlib.md5(normalized.encode()).hexdigest() - Backend: Python 3.12, FastAPI, Celery 5 + Redis, SQLAlchemy 2.0 async - Frontend: React 18 + TypeScript + Zustand + Tailwind + Mapbox GL JS - Mobile: React Native 0.73 + Expo SDK 50 (Turborepo monorepo) - Infra: AWS ECS Fargate, RDS Multi-AZ, ElastiCache Redis, Terraform - Payments: Stripe Connect Marketplace (escrow, split payments, disputes) - KYC: Sumsub API (UAE-specific: DCCA license, Emirates ID) - AI: GPT-4o-mini Route Advisor (EN/AR), ML Price Recommendation - Bot: Python 3.12, aiogram 3.4 async, APScheduler 3.10 (12 jobs) - Backend: FastAPI + SQLAlchemy 2.0 async, pgcrypto AES-256 - AI: Keyword Router (94 patterns) → ChromaDB RAG + rubert-tiny2 → GigaChat API - PWA: React 18 + TypeScript + Vite, Service Worker offline cache, Web Push API - Integration: 1C:Medicine REST API + circuit breaker, 1C:Buhgalteriya export - Infra: Selectel VPS (Saint Petersburg, RF), LUKS full disk encryption - Bot: Python 3.11, aiogram 3 async, FSM state machines - Web: FastAPI + Jinja2 + vanilla JS 15KB (somatype test) - AI: 3-tier Hybrid (L1: 154 patterns → L2: LRU Cache MD5 → L3: Qwen2.5-3B + ChromaDB RAG) - TTS: Long AI responses synthesized as voice messages - DB: PostgreSQL 16 native, RPC find_match() — full matching algorithm in database - Payments: Alfa-Bank REST API (HMAC webhook verification, idempotency) - Infra: Beget VPS, systemd services, Nginx SSL, 24 security measures - AI-powered automation systems (chatbots, triage, routing) - Cross-platform marketplaces (iOS + Android + Web from monorepo) - Enterprise backend with compliance (152-FZ, GDPR, healthcare, fintech)