5839201847:AAHxyz_example_token_here_abc123def456
5839201847:AAHxyz_example_token_here_abc123def456
5839201847:AAHxyz_example_token_here_abc123def456
https://api.telegram.org/botYOUR_TOKEN/getUpdates
https://api.telegram.org/botYOUR_TOKEN/getUpdates
https://api.telegram.org/botYOUR_TOKEN/getUpdates
mkdir fastapi-telegram-bot && cd fastapi-telegram-bot
python -m venv venv && source venv/bin/activate
pip install fastapi uvicorn httpx python-dotenv pydantic
mkdir fastapi-telegram-bot && cd fastapi-telegram-bot
python -m venv venv && source venv/bin/activate
pip install fastapi uvicorn httpx python-dotenv pydantic
mkdir fastapi-telegram-bot && cd fastapi-telegram-bot
python -m venv venv && source venv/bin/activate
pip install fastapi uvicorn httpx python-dotenv pydantic
TELEGRAM_BOT_TOKEN=5839201847:AAHxyz_your_actual_token
TELEGRAM_CHAT_ID=123456789
WEBHOOK_SECRET=your_random_secret_string_here
TELEGRAM_BOT_TOKEN=5839201847:AAHxyz_your_actual_token
TELEGRAM_CHAT_ID=123456789
WEBHOOK_SECRET=your_random_secret_string_here
TELEGRAM_BOT_TOKEN=5839201847:AAHxyz_your_actual_token
TELEGRAM_CHAT_ID=123456789
WEBHOOK_SECRET=your_random_secret_string_here
import os
import logging
from contextlib import asynccontextmanager
from typing import Any import httpx
from fastapi import FastAPI, HTTPException, Header, Request, status
from pydantic import BaseModel, Field
from dotenv import load_dotenv load_dotenv() logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__) TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET")
TELEGRAM_API_URL = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}" if not all([TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID, WEBHOOK_SECRET]): raise RuntimeError("Missing required environment variables. Check .env file.") class AlertPayload(BaseModel): title: str = Field(..., min_length=1, max_length=100) message: str = Field(..., min_length=1, max_length=3000) severity: str = Field(default="info", pattern="^(info|warning|critical)$") source: str = Field(default="system") chat_id: str | None = None # Override default chat if needed class TelegramSender: def __init__(self): self.client = httpx.AsyncClient(timeout=10.0) async def send_message( self, text: str, chat_id: str, parse_mode: str = "HTML", retries: int = 3 ) -> dict[str, Any]: url = f"{TELEGRAM_API_URL}/sendMessage" payload = { "chat_id": chat_id, "text": text, "parse_mode": parse_mode, "disable_web_page_preview": True, } for attempt in range(retries): try: response = await self.client.post(url, json=payload) response.raise_for_status() result = response.json() if not result.get("ok"): raise ValueError(f"Telegram API error: {result.get('description')}") logger.info(f"Message sent successfully. Message ID: {result['result']['message_id']}") return result except httpx.TimeoutException: logger.warning(f"Timeout on attempt {attempt + 1}/{retries}") if attempt == retries - 1: raise except httpx.HTTPStatusError as e: # 429 = rate limited, back off and retry if e.response.status_code == 429: retry_after = int(e.response.headers.get("Retry-After", 5)) logger.warning(f"Rate limited. Waiting {retry_after}s before retry.") import asyncio await asyncio.sleep(retry_after) else: raise raise RuntimeError("Failed to send message after all retries") async def close(self): await self.client.aclose() telegram = TelegramSender() def format_alert_message(alert: AlertPayload) -> str: severity_emoji = { "info": "ℹ️", "warning": "⚠️", "critical": "🚨" } emoji = severity_emoji.get(alert.severity, "ℹ️") return ( f"{emoji} <b>[{alert.severity.upper()}] {alert.title}</b>\n\n" f"{alert.message}\n\n" f"<i>Source: {alert.source}</i>" ) @asynccontextmanager
async def lifespan(app: FastAPI): logger.info("Starting FastAPI Telegram Alert Bot") yield logger.info("Shutting down — closing HTTP client") await telegram.close() app = FastAPI( title="Telegram Alert Bot", description="Real-time alert delivery via Telegram", version="1.0.0", lifespan=lifespan
) @app.post("/alert/{secret}", status_code=status.HTTP_200_OK)
async def receive_alert( secret: str, alert: AlertPayload, request: Request
): # Validate the secret before doing anything else if secret != WEBHOOK_SECRET: logger.warning(f"Invalid secret attempt from {request.client.host}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid webhook secret" ) chat_id = alert.chat_id or TELEGRAM_CHAT_ID message_text = format_alert_message(alert) try: result = await telegram.send_message( text=message_text, chat_id=chat_id ) return { "status": "delivered", "message_id": result["result"]["message_id"], "chat_id": chat_id } except httpx.HTTPStatusError as e: logger.error(f"Telegram HTTP error: {e.response.status_code} - {e.response.text}") raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Telegram API returned {e.response.status_code}" ) except Exception as e: logger.error(f"Unexpected error sending alert: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to deliver alert" ) @app.get("/health")
async def health_check(): return {"status": "ok", "bot_configured": bool(TELEGRAM_BOT_TOKEN)}
import os
import logging
from contextlib import asynccontextmanager
from typing import Any import httpx
from fastapi import FastAPI, HTTPException, Header, Request, status
from pydantic import BaseModel, Field
from dotenv import load_dotenv load_dotenv() logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__) TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET")
TELEGRAM_API_URL = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}" if not all([TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID, WEBHOOK_SECRET]): raise RuntimeError("Missing required environment variables. Check .env file.") class AlertPayload(BaseModel): title: str = Field(..., min_length=1, max_length=100) message: str = Field(..., min_length=1, max_length=3000) severity: str = Field(default="info", pattern="^(info|warning|critical)$") source: str = Field(default="system") chat_id: str | None = None # Override default chat if needed class TelegramSender: def __init__(self): self.client = httpx.AsyncClient(timeout=10.0) async def send_message( self, text: str, chat_id: str, parse_mode: str = "HTML", retries: int = 3 ) -> dict[str, Any]: url = f"{TELEGRAM_API_URL}/sendMessage" payload = { "chat_id": chat_id, "text": text, "parse_mode": parse_mode, "disable_web_page_preview": True, } for attempt in range(retries): try: response = await self.client.post(url, json=payload) response.raise_for_status() result = response.json() if not result.get("ok"): raise ValueError(f"Telegram API error: {result.get('description')}") logger.info(f"Message sent successfully. Message ID: {result['result']['message_id']}") return result except httpx.TimeoutException: logger.warning(f"Timeout on attempt {attempt + 1}/{retries}") if attempt == retries - 1: raise except httpx.HTTPStatusError as e: # 429 = rate limited, back off and retry if e.response.status_code == 429: retry_after = int(e.response.headers.get("Retry-After", 5)) logger.warning(f"Rate limited. Waiting {retry_after}s before retry.") import asyncio await asyncio.sleep(retry_after) else: raise raise RuntimeError("Failed to send message after all retries") async def close(self): await self.client.aclose() telegram = TelegramSender() def format_alert_message(alert: AlertPayload) -> str: severity_emoji = { "info": "ℹ️", "warning": "⚠️", "critical": "🚨" } emoji = severity_emoji.get(alert.severity, "ℹ️") return ( f"{emoji} <b>[{alert.severity.upper()}] {alert.title}</b>\n\n" f"{alert.message}\n\n" f"<i>Source: {alert.source}</i>" ) @asynccontextmanager
async def lifespan(app: FastAPI): logger.info("Starting FastAPI Telegram Alert Bot") yield logger.info("Shutting down — closing HTTP client") await telegram.close() app = FastAPI( title="Telegram Alert Bot", description="Real-time alert delivery via Telegram", version="1.0.0", lifespan=lifespan
) @app.post("/alert/{secret}", status_code=status.HTTP_200_OK)
async def receive_alert( secret: str, alert: AlertPayload, request: Request
): # Validate the secret before doing anything else if secret != WEBHOOK_SECRET: logger.warning(f"Invalid secret attempt from {request.client.host}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid webhook secret" ) chat_id = alert.chat_id or TELEGRAM_CHAT_ID message_text = format_alert_message(alert) try: result = await telegram.send_message( text=message_text, chat_id=chat_id ) return { "status": "delivered", "message_id": result["result"]["message_id"], "chat_id": chat_id } except httpx.HTTPStatusError as e: logger.error(f"Telegram HTTP error: {e.response.status_code} - {e.response.text}") raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Telegram API returned {e.response.status_code}" ) except Exception as e: logger.error(f"Unexpected error sending alert: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to deliver alert" ) @app.get("/health")
async def health_check(): return {"status": "ok", "bot_configured": bool(TELEGRAM_BOT_TOKEN)}
import os
import logging
from contextlib import asynccontextmanager
from typing import Any import httpx
from fastapi import FastAPI, HTTPException, Header, Request, status
from pydantic import BaseModel, Field
from dotenv import load_dotenv load_dotenv() logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__) TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET")
TELEGRAM_API_URL = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}" if not all([TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID, WEBHOOK_SECRET]): raise RuntimeError("Missing required environment variables. Check .env file.") class AlertPayload(BaseModel): title: str = Field(..., min_length=1, max_length=100) message: str = Field(..., min_length=1, max_length=3000) severity: str = Field(default="info", pattern="^(info|warning|critical)$") source: str = Field(default="system") chat_id: str | None = None # Override default chat if needed class TelegramSender: def __init__(self): self.client = httpx.AsyncClient(timeout=10.0) async def send_message( self, text: str, chat_id: str, parse_mode: str = "HTML", retries: int = 3 ) -> dict[str, Any]: url = f"{TELEGRAM_API_URL}/sendMessage" payload = { "chat_id": chat_id, "text": text, "parse_mode": parse_mode, "disable_web_page_preview": True, } for attempt in range(retries): try: response = await self.client.post(url, json=payload) response.raise_for_status() result = response.json() if not result.get("ok"): raise ValueError(f"Telegram API error: {result.get('description')}") logger.info(f"Message sent successfully. Message ID: {result['result']['message_id']}") return result except httpx.TimeoutException: logger.warning(f"Timeout on attempt {attempt + 1}/{retries}") if attempt == retries - 1: raise except httpx.HTTPStatusError as e: # 429 = rate limited, back off and retry if e.response.status_code == 429: retry_after = int(e.response.headers.get("Retry-After", 5)) logger.warning(f"Rate limited. Waiting {retry_after}s before retry.") import asyncio await asyncio.sleep(retry_after) else: raise raise RuntimeError("Failed to send message after all retries") async def close(self): await self.client.aclose() telegram = TelegramSender() def format_alert_message(alert: AlertPayload) -> str: severity_emoji = { "info": "ℹ️", "warning": "⚠️", "critical": "🚨" } emoji = severity_emoji.get(alert.severity, "ℹ️") return ( f"{emoji} <b>[{alert.severity.upper()}] {alert.title}</b>\n\n" f"{alert.message}\n\n" f"<i>Source: {alert.source}</i>" ) @asynccontextmanager
async def lifespan(app: FastAPI): logger.info("Starting FastAPI Telegram Alert Bot") yield logger.info("Shutting down — closing HTTP client") await telegram.close() app = FastAPI( title="Telegram Alert Bot", description="Real-time alert delivery via Telegram", version="1.0.0", lifespan=lifespan
) @app.post("/alert/{secret}", status_code=status.HTTP_200_OK)
async def receive_alert( secret: str, alert: AlertPayload, request: Request
): # Validate the secret before doing anything else if secret != WEBHOOK_SECRET: logger.warning(f"Invalid secret attempt from {request.client.host}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid webhook secret" ) chat_id = alert.chat_id or TELEGRAM_CHAT_ID message_text = format_alert_message(alert) try: result = await telegram.send_message( text=message_text, chat_id=chat_id ) return { "status": "delivered", "message_id": result["result"]["message_id"], "chat_id": chat_id } except httpx.HTTPStatusError as e: logger.error(f"Telegram HTTP error: {e.response.status_code} - {e.response.text}") raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Telegram API returned {e.response.status_code}" ) except Exception as e: logger.error(f"Unexpected error sending alert: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to deliver alert" ) @app.get("/health")
async def health_check(): return {"status": "ok", "bot_configured": bool(TELEGRAM_BOT_TOKEN)}
uvicorn main:app --host 0.0.0.0 --port 8000 --reload
uvicorn main:app --host 0.0.0.0 --port 8000 --reload
uvicorn main:app --host 0.0.0.0 --port 8000 --reload
curl -X POST "http://localhost:8000/alert/your_secret_here" \ -H "Content-Type: application/json" \ -d '{ "title": "CPU Spike Detected", "message": "Server cpu01 hit 97% CPU usage. Load average: 14.2, 13.8, 12.1. Top process: python3 (PID 18432).", "severity": "critical", "source": "prometheus-alertmanager" }'
curl -X POST "http://localhost:8000/alert/your_secret_here" \ -H "Content-Type: application/json" \ -d '{ "title": "CPU Spike Detected", "message": "Server cpu01 hit 97% CPU usage. Load average: 14.2, 13.8, 12.1. Top process: python3 (PID 18432).", "severity": "critical", "source": "prometheus-alertmanager" }'
curl -X POST "http://localhost:8000/alert/your_secret_here" \ -H "Content-Type: application/json" \ -d '{ "title": "CPU Spike Detected", "message": "Server cpu01 hit 97% CPU usage. Load average: 14.2, 13.8, 12.1. Top process: python3 (PID 18432).", "severity": "critical", "source": "prometheus-alertmanager" }'
{ "status": "delivered", "message_id": 47, "chat_id": "123456789"
}
{ "status": "delivered", "message_id": 47, "chat_id": "123456789"
}
{ "status": "delivered", "message_id": 47, "chat_id": "123456789"
}
🚨 [CRITICAL] CPU Spike Detected Server cpu01 hit 97% CPU usage. Load average: 14.2, 13.8, 12.1. Top process: python3 (PID 18432). Source: prometheus-alertmanager
🚨 [CRITICAL] CPU Spike Detected Server cpu01 hit 97% CPU usage. Load average: 14.2, 13.8, 12.1. Top process: python3 (PID 18432). Source: prometheus-alertmanager
🚨 [CRITICAL] CPU Spike Detected Server cpu01 hit 97% CPU usage. Load average: 14.2, 13.8, 12.1. Top process: python3 (PID 18432). Source: prometheus-alertmanager
import httpx
import asyncio async def send_trading_alert(signal: dict): alert_url = "https://your-vps.com/alert/your_secret_here" message = ( f"Symbol: <b>{signal['symbol']}</b>\n" f"Action: <b>{signal['action']}</b>\n" f"Price: ${signal['price']:.4f}\n" f"RSI: {signal['rsi']:.1f} | MACD: {signal['macd']:.6f}\n" f"Confidence: {signal['confidence']}%" ) payload = { "title": f"{signal['action']} Signal — {signal['symbol']}", "message": message, "severity": "warning" if signal['confidence'] < 75 else "critical", "source": "trading-engine-v2" } async with httpx.AsyncClient(timeout=5.0) as client: response = await client.post(alert_url, json=payload) response.raise_for_status() return response.json()
import httpx
import asyncio async def send_trading_alert(signal: dict): alert_url = "https://your-vps.com/alert/your_secret_here" message = ( f"Symbol: <b>{signal['symbol']}</b>\n" f"Action: <b>{signal['action']}</b>\n" f"Price: ${signal['price']:.4f}\n" f"RSI: {signal['rsi']:.1f} | MACD: {signal['macd']:.6f}\n" f"Confidence: {signal['confidence']}%" ) payload = { "title": f"{signal['action']} Signal — {signal['symbol']}", "message": message, "severity": "warning" if signal['confidence'] < 75 else "critical", "source": "trading-engine-v2" } async with httpx.AsyncClient(timeout=5.0) as client: response = await client.post(alert_url, json=payload) response.raise_for_status() return response.json()
import httpx
import asyncio async def send_trading_alert(signal: dict): alert_url = "https://your-vps.com/alert/your_secret_here" message = ( f"Symbol: <b>{signal['symbol']}</b>\n" f"Action: <b>{signal['action']}</b>\n" f"Price: ${signal['price']:.4f}\n" f"RSI: {signal['rsi']:.1f} | MACD: {signal['macd']:.6f}\n" f"Confidence: {signal['confidence']}%" ) payload = { "title": f"{signal['action']} Signal — {signal['symbol']}", "message": message, "severity": "warning" if signal['confidence'] < 75 else "critical", "source": "trading-engine-v2" } async with httpx.AsyncClient(timeout=5.0) as client: response = await client.post(alert_url, json=payload) response.raise_for_status() return response.json()
# /etc/systemd/system/telegram-alert-bot.service
[Unit]
Description=FastAPI Telegram Alert Bot
After=network.target [Service]
Type=simple
User=ubuntu
WorkingDirectory=/home/ubuntu/fastapi-telegram-bot
EnvironmentFile=/home/ubuntu/fastapi-telegram-bot/.env
ExecStart=/home/ubuntu/fastapi-telegram-bot/venv/bin/uvicorn main:app --host 0.0.0.0 --port 8000 --workers 2
Restart=always
RestartSec=5 [Install]
WantedBy=multi-user.target
# /etc/systemd/system/telegram-alert-bot.service
[Unit]
Description=FastAPI Telegram Alert Bot
After=network.target [Service]
Type=simple
User=ubuntu
WorkingDirectory=/home/ubuntu/fastapi-telegram-bot
EnvironmentFile=/home/ubuntu/fastapi-telegram-bot/.env
ExecStart=/home/ubuntu/fastapi-telegram-bot/venv/bin/uvicorn main:app --host 0.0.0.0 --port 8000 --workers 2
Restart=always
RestartSec=5 [Install]
WantedBy=multi-user.target
# /etc/systemd/system/telegram-alert-bot.service
[Unit]
Description=FastAPI Telegram Alert Bot
After=network.target [Service]
Type=simple
User=ubuntu
WorkingDirectory=/home/ubuntu/fastapi-telegram-bot
EnvironmentFile=/home/ubuntu/fastapi-telegram-bot/.env
ExecStart=/home/ubuntu/fastapi-telegram-bot/venv/bin/uvicorn main:app --host 0.0.0.0 --port 8000 --workers 2
Restart=always
RestartSec=5 [Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable telegram-alert-bot
sudo systemctl start telegram-alert-bot
sudo systemctl status telegram-alert-bot
sudo systemctl daemon-reload
sudo systemctl enable telegram-alert-bot
sudo systemctl start telegram-alert-bot
sudo systemctl status telegram-alert-bot
sudo systemctl daemon-reload
sudo systemctl enable telegram-alert-bot
sudo systemctl start telegram-alert-bot
sudo systemctl status telegram-alert-bot
# crontab -e
*/5 * * * * /home/ubuntu/scripts/check_health.sh
# crontab -e
*/5 * * * * /home/ubuntu/scripts/check_health.sh
# crontab -e
*/5 * * * * /home/ubuntu/scripts/check_health.sh
- name: Send deployment alert run: | curl -s -X POST "${{ secrets.ALERT_WEBHOOK_URL }}/alert/${{ secrets.WEBHOOK_SECRET }}" \ -H "Content-Type: application/json" \ -d "{\"title\": \"Deployment Complete\", \"message\": \"${{ github.repository }} deployed commit ${{ github.sha }} to production.\", \"severity\": \"info\", \"source\": \"github-actions\"}"
- name: Send deployment alert run: | curl -s -X POST "${{ secrets.ALERT_WEBHOOK_URL }}/alert/${{ secrets.WEBHOOK_SECRET }}" \ -H "Content-Type: application/json" \ -d "{\"title\": \"Deployment Complete\", \"message\": \"${{ github.repository }} deployed commit ${{ github.sha }} to production.\", \"severity\": \"info\", \"source\": \"github-actions\"}"
- name: Send deployment alert run: | curl -s -X POST "${{ secrets.ALERT_WEBHOOK_URL }}/alert/${{ secrets.WEBHOOK_SECRET }}" \ -H "Content-Type: application/json" \ -d "{\"title\": \"Deployment Complete\", \"message\": \"${{ github.repository }} deployed commit ${{ github.sha }} to production.\", \"severity\": \"info\", \"source\": \"github-actions\"}" - Hire me on Upwork — Python automation, API integrations, trading systems
- Check my Fiverr gigs — Bot development, web scraping, data pipelines