Tools: How to Build a Crypto Derivatives Monitor with CoinGlass API and Python (2026)
The Problem Nobody Talks About
Table of Contents
Why Derivatives Data Matters
Understanding the Four Key Metrics
Funding Rate
Open Interest (OI)
Liquidation Data
Long/Short Ratio
Setting Up CoinGlass API V4
Getting Your API Key
Project Setup
Config File
Building the Data Layer
Building the Signal Engine
Building the Alert Layer
Putting It All Together
Deploying to Production
Option A: Linux VPS (Recommended)
Estimated Running Costs
What to Watch For
Next Steps
Full API Reference Summary
Wrapping Up Tags: python cryptocurrency api quantitative-finance tutorial Estimated reading time: 25 minutes | Difficulty: Intermediate It's 3 AM. Bitcoin just dropped 18% in 45 minutes. Your long position got liquidated while you were asleep. You wake up to a notification that your account balance is zero. This isn't a hypothetical. On August 5, 2024, over $1 billion in liquidations hit the market in a single hour. The signals were there beforehand — funding rates at extreme highs, open interest surging, long/short ratio at dangerous levels. But nobody was watching. The crypto market runs 24/7. You don't. In this tutorial, I'll show you how to build a production-ready crypto derivatives monitor using the CoinGlass API V4 and Python. By the end, you'll have a system that: Most crypto traders watch price charts. The traders who consistently outperform watch something else entirely: the derivatives market. Here's why: the perpetual futures market in crypto is enormous. On any given day, BTC perpetual swaps alone trade over $50 billion in volume — dwarfing the spot market. This market is driven by leveraged positions, and leveraged positions create predictable pressure points. When too many traders are positioned on the same side, the market has a structural incentive to move against them. Smart money knows where the liquidations are clustered. Liquidation cascades don't happen randomly — they happen at predictable price levels, triggered by identifiable conditions. The four metrics we'll monitor are: None of these metrics are available on a standard price chart. You need a derivatives data API to access them — and that's where CoinGlass comes in. Before writing a single line of code, let's make sure we understand what we're actually measuring. Perpetual futures contracts don't expire, which means exchanges need a mechanism to keep their price anchored to the spot market. That mechanism is the funding rate — a periodic payment exchanged between long and short holders. The signal isn't the direction — it's the magnitude. When the average funding rate across exchanges exceeds +0.1% per 8 hours, the cost of holding longs becomes unsustainable. History shows that when funding rates reach these extremes, mean reversion usually follows within 24–72 hours. Open interest measures the total value of all open derivative positions — every long and short that hasn't been closed yet. Think of it as the total amount of money currently "at stake" in the market. OI alone tells you about the size of the market. But OI changes tell you something far more important: When a leveraged position's losses exceed its margin, the exchange forcibly closes it — this is a liquidation. Large-scale liquidations are both a symptom and a cause of price moves. The dangerous scenario: liquidation cascades. Price falls → triggers long liquidations → liquidation selling pushes price lower → triggers more liquidations → price accelerates downward. We saw this loop play out multiple times in 2024. CoinGlass tracks liquidation data across 30+ exchanges in real time. Monitoring hourly liquidation volumes gives you an early warning system for cascade events. The long/short ratio tells you what percentage of accounts are positioned long vs. short. Critically, CoinGlass provides two versions of this ratio: The most powerful signal: when these two diverge. When retail is overwhelmingly long while top traders are quietly building short positions, that's an institutional-grade warning sign. ⚠️ Important: CoinGlass has fully migrated to API V4. All previous API versions are deprecated. Make sure you're using the V4 base URL: https://open-api-v4.coinglass.com Create a .env file for your credentials: The data layer is responsible for one thing: fetching clean, structured data from CoinGlass API V4. Each function maps directly to an official V4 endpoint. The signal engine reads the snapshot and produces structured alerts. No AI required — just clean conditional logic based on the thresholds we defined in config.py. Run it locally to test: You should see output like: For 24/7 operation, deploy to a small cloud instance. Any $5–10/month VPS works. DigitalOcean, Linode, AWS EC2 t3.micro, or Alibaba Cloud. Now that your monitor is running, here's a quick field guide to interpreting what you'll see: You now have a working crypto derivatives monitor. Here's how to take it further: 1. Add historical backtesting
Use get_funding_rate_history() and get_open_interest_history() with limit=200 to pull months of data. Run your signal engine against historical snapshots to measure how often each signal preceded a significant price move. 2. Add more symbolsThe monitor supports any symbol in config.py. Add "SOL", "XRP", "ETH" — CoinGlass provides the same depth of data for all major assets. 3. Add a web dashboardReplace Telegram with a simple Streamlit dashboard. Use st.metric() for the live numbers and st.dataframe() for signal history. 4. Connect to executionOnce you trust your signals, connect them to an exchange API (Binance, OKX, Bybit all have Python SDKs). Add position sizing logic and you have the skeleton of a live trading system. 5. Layer in AI analysisPass the snapshot dictionary to Claude or GPT-4o with a structured prompt. The model can provide natural-language context that rules-based logic misses — especially useful for identifying novel market regimes. Base URL: https://open-api-v4.coinglass.comAuth header: CG-API-KEY: your_api_key
Valid intervals: m1 m5 m15 m30 h1 h4 h8 h24 The crypto market never sleeps, and neither should your monitoring system. What we built today is a solid foundation: clean data fetching from CoinGlass API V4, a rule-based signal engine covering the five most important derivatives metrics, and a Telegram alert system with cooldown logic to avoid alert fatigue. The real edge isn't in any single signal — it's in the combination. When funding rates are extreme, OI is surging, and top traders are positioned opposite to retail, you're looking at a convergence of evidence that's historically been a reliable warning sign. Data is the moat. Build it early. 🔑 Get your CoinGlass API key: coinglass.com/pricing 📘 CoinGlass API V4 Docs: docs.coinglass.com ⭐ If this helped you, drop a reaction on Dev.to — it helps other developers find the article. Built with CoinGlass API V4 · Python 3.10+ · Not financial advice Templates let you quickly answer FAQs or store snippets for re-use. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse
┌─────────────────────────────────────────────────────────┐
│ The Derivatives Intelligence Stack │
├─────────────────────┬───────────────────────────────────┤
│ Metric │ What It Tells You │
├─────────────────────┼───────────────────────────────────┤
│ Funding Rate │ Cost of holding leveraged │
│ │ positions; extreme values = │
│ │ sentiment at limits │
├─────────────────────┼───────────────────────────────────┤
│ Open Interest │ Total capital in open positions; │
│ │ surges signal incoming moves │
├─────────────────────┼───────────────────────────────────┤
│ Liquidation Data │ Forced position closures; │
│ │ cascades amplify price moves │
├─────────────────────┼───────────────────────────────────┤
│ Long/Short Ratio │ Directional bias of market │
│ │ participants; extremes = │
│ │ contrarian signals │
└─────────────────────┴───────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ The Derivatives Intelligence Stack │
├─────────────────────┬───────────────────────────────────┤
│ Metric │ What It Tells You │
├─────────────────────┼───────────────────────────────────┤
│ Funding Rate │ Cost of holding leveraged │
│ │ positions; extreme values = │
│ │ sentiment at limits │
├─────────────────────┼───────────────────────────────────┤
│ Open Interest │ Total capital in open positions; │
│ │ surges signal incoming moves │
├─────────────────────┼───────────────────────────────────┤
│ Liquidation Data │ Forced position closures; │
│ │ cascades amplify price moves │
├─────────────────────┼───────────────────────────────────┤
│ Long/Short Ratio │ Directional bias of market │
│ │ participants; extremes = │
│ │ contrarian signals │
└─────────────────────┴───────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ The Derivatives Intelligence Stack │
├─────────────────────┬───────────────────────────────────┤
│ Metric │ What It Tells You │
├─────────────────────┼───────────────────────────────────┤
│ Funding Rate │ Cost of holding leveraged │
│ │ positions; extreme values = │
│ │ sentiment at limits │
├─────────────────────┼───────────────────────────────────┤
│ Open Interest │ Total capital in open positions; │
│ │ surges signal incoming moves │
├─────────────────────┼───────────────────────────────────┤
│ Liquidation Data │ Forced position closures; │
│ │ cascades amplify price moves │
├─────────────────────┼───────────────────────────────────┤
│ Long/Short Ratio │ Directional bias of market │
│ │ participants; extremes = │
│ │ contrarian signals │
└─────────────────────┴───────────────────────────────────┘
Funding Rate Interpretation Guide
─────────────────────────────────────────────────────────── Rate Interpretation Risk Level ───────── ────────────────── ────────── > +0.10% → Extreme greed 🔴 HIGH (longs at risk) +0.05 to Elevated bullish 🟡 MEDIUM +0.10% sentiment -0.05 to Neutral / balanced 🟢 LOW +0.05% -0.05 to Elevated bearish 🟡 MEDIUM -0.10% sentiment < -0.10% → Extreme fear 🔴 HIGH (shorts at risk)
───────────────────────────────────────────────────────────
Funding Rate Interpretation Guide
─────────────────────────────────────────────────────────── Rate Interpretation Risk Level ───────── ────────────────── ────────── > +0.10% → Extreme greed 🔴 HIGH (longs at risk) +0.05 to Elevated bullish 🟡 MEDIUM +0.10% sentiment -0.05 to Neutral / balanced 🟢 LOW +0.05% -0.05 to Elevated bearish 🟡 MEDIUM -0.10% sentiment < -0.10% → Extreme fear 🔴 HIGH (shorts at risk)
───────────────────────────────────────────────────────────
Funding Rate Interpretation Guide
─────────────────────────────────────────────────────────── Rate Interpretation Risk Level ───────── ────────────────── ────────── > +0.10% → Extreme greed 🔴 HIGH (longs at risk) +0.05 to Elevated bullish 🟡 MEDIUM +0.10% sentiment -0.05 to Neutral / balanced 🟢 LOW +0.05% -0.05 to Elevated bearish 🟡 MEDIUM -0.10% sentiment < -0.10% → Extreme fear 🔴 HIGH (shorts at risk)
───────────────────────────────────────────────────────────
# Create project directory
mkdir crypto-derivatives-monitor
cd crypto-derivatives-monitor # Create virtual environment
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate # Install dependencies
pip install requests schedule python-dotenv anthropic
# Create project directory
mkdir crypto-derivatives-monitor
cd crypto-derivatives-monitor # Create virtual environment
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate # Install dependencies
pip install requests schedule python-dotenv anthropic
# Create project directory
mkdir crypto-derivatives-monitor
cd crypto-derivatives-monitor # Create virtual environment
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate # Install dependencies
pip install requests schedule python-dotenv anthropic
COINGLASS_API_KEY=your_coinglass_api_key_here
TELEGRAM_BOT_TOKEN=your_telegram_bot_token
TELEGRAM_CHAT_ID=your_telegram_chat_id
COINGLASS_API_KEY=your_coinglass_api_key_here
TELEGRAM_BOT_TOKEN=your_telegram_bot_token
TELEGRAM_CHAT_ID=your_telegram_chat_id
COINGLASS_API_KEY=your_coinglass_api_key_here
TELEGRAM_BOT_TOKEN=your_telegram_bot_token
TELEGRAM_CHAT_ID=your_telegram_chat_id
crypto-derivatives-monitor/
├── .env
├── config.py
├── data_layer.py
├── signal_engine.py
├── alert_layer.py
├── monitor.py
└── requirements.txt
crypto-derivatives-monitor/
├── .env
├── config.py
├── data_layer.py
├── signal_engine.py
├── alert_layer.py
├── monitor.py
└── requirements.txt
crypto-derivatives-monitor/
├── .env
├── config.py
├── data_layer.py
├── signal_engine.py
├── alert_layer.py
├── monitor.py
└── requirements.txt
# config.py
import os
from dotenv import load_dotenv load_dotenv() # CoinGlass API V4
BASE_URL = "https://open-api-v4.coinglass.com"
API_KEY = os.getenv("COINGLASS_API_KEY")
HEADERS = { "CG-API-KEY": API_KEY, "Accept": "application/json"
} # Telegram
TELEGRAM_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID") # Monitoring settings
SYMBOLS = ["BTC", "ETH"]
POLL_INTERVAL = 5 # minutes
DEFAULT_INTERVAL = "h1" # candle size for history endpoints
DEFAULT_LIMIT = 2 # enough to calculate 1-period change # Signal thresholds
FUNDING_RATE_HIGH = 0.10 # % — extreme greed
FUNDING_RATE_LOW = -0.05 # % — extreme fear
OI_CHANGE_THRESHOLD = 5.0 # % change in 1 hour
LIQ_THRESHOLD_USD = 50_000_000 # $50M single-side liquidation
LONG_RATIO_HIGH = 65.0 # % — too many longs
SHORT_RATIO_HIGH = 60.0 # % — too many shorts
TOP_DIVERGENCE_GAP = 10.0 # % gap between retail and top traders
# config.py
import os
from dotenv import load_dotenv load_dotenv() # CoinGlass API V4
BASE_URL = "https://open-api-v4.coinglass.com"
API_KEY = os.getenv("COINGLASS_API_KEY")
HEADERS = { "CG-API-KEY": API_KEY, "Accept": "application/json"
} # Telegram
TELEGRAM_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID") # Monitoring settings
SYMBOLS = ["BTC", "ETH"]
POLL_INTERVAL = 5 # minutes
DEFAULT_INTERVAL = "h1" # candle size for history endpoints
DEFAULT_LIMIT = 2 # enough to calculate 1-period change # Signal thresholds
FUNDING_RATE_HIGH = 0.10 # % — extreme greed
FUNDING_RATE_LOW = -0.05 # % — extreme fear
OI_CHANGE_THRESHOLD = 5.0 # % change in 1 hour
LIQ_THRESHOLD_USD = 50_000_000 # $50M single-side liquidation
LONG_RATIO_HIGH = 65.0 # % — too many longs
SHORT_RATIO_HIGH = 60.0 # % — too many shorts
TOP_DIVERGENCE_GAP = 10.0 # % gap between retail and top traders
# config.py
import os
from dotenv import load_dotenv load_dotenv() # CoinGlass API V4
BASE_URL = "https://open-api-v4.coinglass.com"
API_KEY = os.getenv("COINGLASS_API_KEY")
HEADERS = { "CG-API-KEY": API_KEY, "Accept": "application/json"
} # Telegram
TELEGRAM_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID") # Monitoring settings
SYMBOLS = ["BTC", "ETH"]
POLL_INTERVAL = 5 # minutes
DEFAULT_INTERVAL = "h1" # candle size for history endpoints
DEFAULT_LIMIT = 2 # enough to calculate 1-period change # Signal thresholds
FUNDING_RATE_HIGH = 0.10 # % — extreme greed
FUNDING_RATE_LOW = -0.05 # % — extreme fear
OI_CHANGE_THRESHOLD = 5.0 # % change in 1 hour
LIQ_THRESHOLD_USD = 50_000_000 # $50M single-side liquidation
LONG_RATIO_HIGH = 65.0 # % — too many longs
SHORT_RATIO_HIGH = 60.0 # % — too many shorts
TOP_DIVERGENCE_GAP = 10.0 # % gap between retail and top traders
# data_layer.py
import requests
import logging
from datetime import datetime
from config import BASE_URL, HEADERS logger = logging.getLogger(__name__) def _get(endpoint: str, params: dict) -> list: """ Generic GET helper with error handling. Returns the 'data' array from CoinGlass API response. """ url = f"{BASE_URL}{endpoint}" try: resp = requests.get(url, headers=HEADERS, params=params, timeout=10) resp.raise_for_status() return resp.json().get("data", []) except requests.exceptions.Timeout: logger.error(f"Timeout on {endpoint}") return [] except requests.exceptions.HTTPError as e: logger.error(f"HTTP error on {endpoint}: {e.response.status_code}") return [] except Exception as e: logger.error(f"Unexpected error on {endpoint}: {e}") return [] # ── Funding Rate ────────────────────────────────────────────────────────────── def get_funding_rate_by_exchange(symbol: str = "BTC") -> dict: """ Fetch current funding rates across all exchanges. Endpoint: GET /api/futures/fundingRate/exchange-list Params: symbol (str): Asset symbol e.g. "BTC", "ETH" Response fields per item: exchangeName — Exchange identifier fundingRate — Current rate (decimal, e.g. 0.0001 = 0.01%) nextFundingTime — Unix timestamp (ms) of next settlement """ data = _get("/api/futures/fundingRate/exchange-list", {"symbol": symbol}) if not data: return {} rates = [float(x["fundingRate"]) * 100 for x in data if x.get("fundingRate") is not None] avg = sum(rates) / len(rates) if rates else 0.0 return { "avg_funding_rate_pct": round(avg, 6), "exchange_count": len(rates), "max_rate_pct": round(max(rates), 6) if rates else 0, "min_rate_pct": round(min(rates), 6) if rates else 0, "exchanges": [ { "name": x.get("exchangeName", ""), "rate_pct": round(float(x.get("fundingRate", 0)) * 100, 6), "next_funding": x.get("nextFundingTime") } for x in data[:8] ] } def get_funding_rate_history(symbol: str = "BTC", interval: str = "h1", limit: int = 24) -> list: """ Fetch OHLC history of funding rates. Endpoint: GET /api/futures/fundingRate/ohlc-history Params: symbol (str): Asset symbol interval (str): Candle size — m1 m5 m15 m30 h1 h4 h8 h24 limit (int): Number of candles to return (max 200) Response fields per item: t — timestamp (ms) o, h, l, c — open/high/low/close funding rate (decimal) """ return _get( "/api/futures/fundingRate/ohlc-history", {"symbol": symbol, "interval": interval, "limit": limit} ) # ── Open Interest ───────────────────────────────────────────────────────────── def get_open_interest_history(symbol: str = "BTC", interval: str = "h1", limit: int = 2) -> dict: """ Fetch open interest OHLC history and compute period change. Endpoint: GET /api/futures/openInterest/ohlc-history Params: symbol (str): Asset symbol interval (str): Candle size — m1 m5 m15 m30 h1 h4 h8 h24 limit (int): Number of candles (min 2 to compute change) Response fields per item: t — timestamp (ms) o, h, l, c — open interest in USD """ data = _get( "/api/futures/openInterest/ohlc-history", {"symbol": symbol, "interval": interval, "limit": limit} ) if len(data) < 2: return {} latest = data[-1] previous = data[-2] change = (latest["c"] - previous["c"]) / previous["c"] * 100 if previous["c"] else 0 return { "current_oi_usd": latest["c"], "previous_oi_usd": previous["c"], "change_pct": round(change, 3), "period_high_usd": latest["h"], "period_low_usd": latest["l"], "timestamp_ms": latest["t"] } # ── Liquidations ────────────────────────────────────────────────────────────── def get_liquidation_history(symbol: str = "BTC", interval: str = "h1", limit: int = 1) -> dict: """ Fetch aggregated liquidation history (longs + shorts). Endpoint: GET /api/futures/liquidation/history Params: symbol (str): Asset symbol interval (str): Candle size — m5 m15 m30 h1 h4 h8 h24 limit (int): Number of candles Response fields per item: t — timestamp (ms) longLiqUsd — Long liquidations in USD shortLiqUsd — Short liquidations in USD """ data = _get( "/api/futures/liquidation/history", {"symbol": symbol, "interval": interval, "limit": limit} ) if not data: return {} latest = data[-1] long_liq = float(latest.get("longLiqUsd", 0)) short_liq = float(latest.get("shortLiqUsd", 0)) total = long_liq + short_liq return { "long_liq_usd": long_liq, "short_liq_usd": short_liq, "total_liq_usd": total, "dominant_side": "longs" if long_liq > short_liq else "shorts", "imbalance_ratio": round(max(long_liq, short_liq) / min(long_liq, short_liq), 2) if min(long_liq, short_liq) > 0 else 0, "timestamp_ms": latest.get("t") } # ── Long / Short Ratios ─────────────────────────────────────────────────────── def get_global_long_short_ratio(symbol: str = "BTC", interval: str = "h1", limit: int = 1) -> dict: """ Fetch global (all accounts) long/short account ratio history. Endpoint: GET /api/futures/global-long-short-account-ratio/history Params: symbol (str): Asset symbol interval (str): Candle size — m5 m15 m30 h1 h4 h8 h24 limit (int): Number of candles Response fields per item: t — timestamp (ms) longRatio — Fraction of accounts long (0 to 1) shortRatio — Fraction of accounts short (0 to 1) """ data = _get( "/api/futures/global-long-short-account-ratio/history", {"symbol": symbol, "interval": interval, "limit": limit} ) if not data: return {} latest = data[-1] return { "long_pct": round(float(latest.get("longRatio", 0)) * 100, 2), "short_pct": round(float(latest.get("shortRatio", 0)) * 100, 2), "timestamp_ms": latest.get("t") } def get_top_trader_long_short_ratio(symbol: str = "BTC", interval: str = "h1", limit: int = 1) -> dict: """ Fetch top trader (large accounts) long/short account ratio history. Endpoint: GET /api/futures/top-long-short-account-ratio/history Params: identical to global ratio endpoint above Response fields: identical to global ratio endpoint above """ data = _get( "/api/futures/top-long-short-account-ratio/history", {"symbol": symbol, "interval": interval, "limit": limit} ) if not data: return {} latest = data[-1] return { "long_pct": round(float(latest.get("longRatio", 0)) * 100, 2), "short_pct": round(float(latest.get("shortRatio", 0)) * 100, 2), "timestamp_ms": latest.get("t") } def get_taker_buy_sell_volume(symbol: str = "BTC", interval: str = "h1", limit: int = 1) -> dict: """ Fetch taker (market order) buy vs. sell volume history. Endpoint: GET /api/futures/taker-buy-sell-volume/history Params: symbol (str): Asset symbol interval (str): Candle size — m5 m15 m30 h1 h4 h8 h24 limit (int): Number of candles Response fields per item: t — timestamp (ms) buyVol — Taker buy volume in USD sellVol — Taker sell volume in USD """ data = _get( "/api/futures/taker-buy-sell-volume/history", {"symbol": symbol, "interval": interval, "limit": limit} ) if not data: return {} latest = data[-1] buy_vol = float(latest.get("buyVol", 0)) sell_vol = float(latest.get("sellVol", 0)) total = buy_vol + sell_vol return { "buy_vol_usd": buy_vol, "sell_vol_usd": sell_vol, "buy_pct": round(buy_vol / total * 100, 2) if total else 50.0, "sell_pct": round(sell_vol / total * 100, 2) if total else 50.0, "bias": "buy-side" if buy_vol > sell_vol else "sell-side", "timestamp_ms": latest.get("t") } # ── Composite Data Package ──────────────────────────────────────────────────── def build_market_snapshot(symbol: str = "BTC") -> dict: """ Assemble a complete derivatives market snapshot for one symbol. Calls all five endpoints and returns a unified dict. """ logger.info(f"Fetching snapshot for {symbol}...") return { "symbol": symbol, "timestamp": datetime.utcnow().isoformat() + "Z", "funding_rate": get_funding_rate_by_exchange(symbol), "open_interest": get_open_interest_history(symbol), "liquidation": get_liquidation_history(symbol), "global_ls": get_global_long_short_ratio(symbol), "top_trader_ls": get_top_trader_long_short_ratio(symbol), "taker_volume": get_taker_buy_sell_volume(symbol), }
# data_layer.py
import requests
import logging
from datetime import datetime
from config import BASE_URL, HEADERS logger = logging.getLogger(__name__) def _get(endpoint: str, params: dict) -> list: """ Generic GET helper with error handling. Returns the 'data' array from CoinGlass API response. """ url = f"{BASE_URL}{endpoint}" try: resp = requests.get(url, headers=HEADERS, params=params, timeout=10) resp.raise_for_status() return resp.json().get("data", []) except requests.exceptions.Timeout: logger.error(f"Timeout on {endpoint}") return [] except requests.exceptions.HTTPError as e: logger.error(f"HTTP error on {endpoint}: {e.response.status_code}") return [] except Exception as e: logger.error(f"Unexpected error on {endpoint}: {e}") return [] # ── Funding Rate ────────────────────────────────────────────────────────────── def get_funding_rate_by_exchange(symbol: str = "BTC") -> dict: """ Fetch current funding rates across all exchanges. Endpoint: GET /api/futures/fundingRate/exchange-list Params: symbol (str): Asset symbol e.g. "BTC", "ETH" Response fields per item: exchangeName — Exchange identifier fundingRate — Current rate (decimal, e.g. 0.0001 = 0.01%) nextFundingTime — Unix timestamp (ms) of next settlement """ data = _get("/api/futures/fundingRate/exchange-list", {"symbol": symbol}) if not data: return {} rates = [float(x["fundingRate"]) * 100 for x in data if x.get("fundingRate") is not None] avg = sum(rates) / len(rates) if rates else 0.0 return { "avg_funding_rate_pct": round(avg, 6), "exchange_count": len(rates), "max_rate_pct": round(max(rates), 6) if rates else 0, "min_rate_pct": round(min(rates), 6) if rates else 0, "exchanges": [ { "name": x.get("exchangeName", ""), "rate_pct": round(float(x.get("fundingRate", 0)) * 100, 6), "next_funding": x.get("nextFundingTime") } for x in data[:8] ] } def get_funding_rate_history(symbol: str = "BTC", interval: str = "h1", limit: int = 24) -> list: """ Fetch OHLC history of funding rates. Endpoint: GET /api/futures/fundingRate/ohlc-history Params: symbol (str): Asset symbol interval (str): Candle size — m1 m5 m15 m30 h1 h4 h8 h24 limit (int): Number of candles to return (max 200) Response fields per item: t — timestamp (ms) o, h, l, c — open/high/low/close funding rate (decimal) """ return _get( "/api/futures/fundingRate/ohlc-history", {"symbol": symbol, "interval": interval, "limit": limit} ) # ── Open Interest ───────────────────────────────────────────────────────────── def get_open_interest_history(symbol: str = "BTC", interval: str = "h1", limit: int = 2) -> dict: """ Fetch open interest OHLC history and compute period change. Endpoint: GET /api/futures/openInterest/ohlc-history Params: symbol (str): Asset symbol interval (str): Candle size — m1 m5 m15 m30 h1 h4 h8 h24 limit (int): Number of candles (min 2 to compute change) Response fields per item: t — timestamp (ms) o, h, l, c — open interest in USD """ data = _get( "/api/futures/openInterest/ohlc-history", {"symbol": symbol, "interval": interval, "limit": limit} ) if len(data) < 2: return {} latest = data[-1] previous = data[-2] change = (latest["c"] - previous["c"]) / previous["c"] * 100 if previous["c"] else 0 return { "current_oi_usd": latest["c"], "previous_oi_usd": previous["c"], "change_pct": round(change, 3), "period_high_usd": latest["h"], "period_low_usd": latest["l"], "timestamp_ms": latest["t"] } # ── Liquidations ────────────────────────────────────────────────────────────── def get_liquidation_history(symbol: str = "BTC", interval: str = "h1", limit: int = 1) -> dict: """ Fetch aggregated liquidation history (longs + shorts). Endpoint: GET /api/futures/liquidation/history Params: symbol (str): Asset symbol interval (str): Candle size — m5 m15 m30 h1 h4 h8 h24 limit (int): Number of candles Response fields per item: t — timestamp (ms) longLiqUsd — Long liquidations in USD shortLiqUsd — Short liquidations in USD """ data = _get( "/api/futures/liquidation/history", {"symbol": symbol, "interval": interval, "limit": limit} ) if not data: return {} latest = data[-1] long_liq = float(latest.get("longLiqUsd", 0)) short_liq = float(latest.get("shortLiqUsd", 0)) total = long_liq + short_liq return { "long_liq_usd": long_liq, "short_liq_usd": short_liq, "total_liq_usd": total, "dominant_side": "longs" if long_liq > short_liq else "shorts", "imbalance_ratio": round(max(long_liq, short_liq) / min(long_liq, short_liq), 2) if min(long_liq, short_liq) > 0 else 0, "timestamp_ms": latest.get("t") } # ── Long / Short Ratios ─────────────────────────────────────────────────────── def get_global_long_short_ratio(symbol: str = "BTC", interval: str = "h1", limit: int = 1) -> dict: """ Fetch global (all accounts) long/short account ratio history. Endpoint: GET /api/futures/global-long-short-account-ratio/history Params: symbol (str): Asset symbol interval (str): Candle size — m5 m15 m30 h1 h4 h8 h24 limit (int): Number of candles Response fields per item: t — timestamp (ms) longRatio — Fraction of accounts long (0 to 1) shortRatio — Fraction of accounts short (0 to 1) """ data = _get( "/api/futures/global-long-short-account-ratio/history", {"symbol": symbol, "interval": interval, "limit": limit} ) if not data: return {} latest = data[-1] return { "long_pct": round(float(latest.get("longRatio", 0)) * 100, 2), "short_pct": round(float(latest.get("shortRatio", 0)) * 100, 2), "timestamp_ms": latest.get("t") } def get_top_trader_long_short_ratio(symbol: str = "BTC", interval: str = "h1", limit: int = 1) -> dict: """ Fetch top trader (large accounts) long/short account ratio history. Endpoint: GET /api/futures/top-long-short-account-ratio/history Params: identical to global ratio endpoint above Response fields: identical to global ratio endpoint above """ data = _get( "/api/futures/top-long-short-account-ratio/history", {"symbol": symbol, "interval": interval, "limit": limit} ) if not data: return {} latest = data[-1] return { "long_pct": round(float(latest.get("longRatio", 0)) * 100, 2), "short_pct": round(float(latest.get("shortRatio", 0)) * 100, 2), "timestamp_ms": latest.get("t") } def get_taker_buy_sell_volume(symbol: str = "BTC", interval: str = "h1", limit: int = 1) -> dict: """ Fetch taker (market order) buy vs. sell volume history. Endpoint: GET /api/futures/taker-buy-sell-volume/history Params: symbol (str): Asset symbol interval (str): Candle size — m5 m15 m30 h1 h4 h8 h24 limit (int): Number of candles Response fields per item: t — timestamp (ms) buyVol — Taker buy volume in USD sellVol — Taker sell volume in USD """ data = _get( "/api/futures/taker-buy-sell-volume/history", {"symbol": symbol, "interval": interval, "limit": limit} ) if not data: return {} latest = data[-1] buy_vol = float(latest.get("buyVol", 0)) sell_vol = float(latest.get("sellVol", 0)) total = buy_vol + sell_vol return { "buy_vol_usd": buy_vol, "sell_vol_usd": sell_vol, "buy_pct": round(buy_vol / total * 100, 2) if total else 50.0, "sell_pct": round(sell_vol / total * 100, 2) if total else 50.0, "bias": "buy-side" if buy_vol > sell_vol else "sell-side", "timestamp_ms": latest.get("t") } # ── Composite Data Package ──────────────────────────────────────────────────── def build_market_snapshot(symbol: str = "BTC") -> dict: """ Assemble a complete derivatives market snapshot for one symbol. Calls all five endpoints and returns a unified dict. """ logger.info(f"Fetching snapshot for {symbol}...") return { "symbol": symbol, "timestamp": datetime.utcnow().isoformat() + "Z", "funding_rate": get_funding_rate_by_exchange(symbol), "open_interest": get_open_interest_history(symbol), "liquidation": get_liquidation_history(symbol), "global_ls": get_global_long_short_ratio(symbol), "top_trader_ls": get_top_trader_long_short_ratio(symbol), "taker_volume": get_taker_buy_sell_volume(symbol), }
# data_layer.py
import requests
import logging
from datetime import datetime
from config import BASE_URL, HEADERS logger = logging.getLogger(__name__) def _get(endpoint: str, params: dict) -> list: """ Generic GET helper with error handling. Returns the 'data' array from CoinGlass API response. """ url = f"{BASE_URL}{endpoint}" try: resp = requests.get(url, headers=HEADERS, params=params, timeout=10) resp.raise_for_status() return resp.json().get("data", []) except requests.exceptions.Timeout: logger.error(f"Timeout on {endpoint}") return [] except requests.exceptions.HTTPError as e: logger.error(f"HTTP error on {endpoint}: {e.response.status_code}") return [] except Exception as e: logger.error(f"Unexpected error on {endpoint}: {e}") return [] # ── Funding Rate ────────────────────────────────────────────────────────────── def get_funding_rate_by_exchange(symbol: str = "BTC") -> dict: """ Fetch current funding rates across all exchanges. Endpoint: GET /api/futures/fundingRate/exchange-list Params: symbol (str): Asset symbol e.g. "BTC", "ETH" Response fields per item: exchangeName — Exchange identifier fundingRate — Current rate (decimal, e.g. 0.0001 = 0.01%) nextFundingTime — Unix timestamp (ms) of next settlement """ data = _get("/api/futures/fundingRate/exchange-list", {"symbol": symbol}) if not data: return {} rates = [float(x["fundingRate"]) * 100 for x in data if x.get("fundingRate") is not None] avg = sum(rates) / len(rates) if rates else 0.0 return { "avg_funding_rate_pct": round(avg, 6), "exchange_count": len(rates), "max_rate_pct": round(max(rates), 6) if rates else 0, "min_rate_pct": round(min(rates), 6) if rates else 0, "exchanges": [ { "name": x.get("exchangeName", ""), "rate_pct": round(float(x.get("fundingRate", 0)) * 100, 6), "next_funding": x.get("nextFundingTime") } for x in data[:8] ] } def get_funding_rate_history(symbol: str = "BTC", interval: str = "h1", limit: int = 24) -> list: """ Fetch OHLC history of funding rates. Endpoint: GET /api/futures/fundingRate/ohlc-history Params: symbol (str): Asset symbol interval (str): Candle size — m1 m5 m15 m30 h1 h4 h8 h24 limit (int): Number of candles to return (max 200) Response fields per item: t — timestamp (ms) o, h, l, c — open/high/low/close funding rate (decimal) """ return _get( "/api/futures/fundingRate/ohlc-history", {"symbol": symbol, "interval": interval, "limit": limit} ) # ── Open Interest ───────────────────────────────────────────────────────────── def get_open_interest_history(symbol: str = "BTC", interval: str = "h1", limit: int = 2) -> dict: """ Fetch open interest OHLC history and compute period change. Endpoint: GET /api/futures/openInterest/ohlc-history Params: symbol (str): Asset symbol interval (str): Candle size — m1 m5 m15 m30 h1 h4 h8 h24 limit (int): Number of candles (min 2 to compute change) Response fields per item: t — timestamp (ms) o, h, l, c — open interest in USD """ data = _get( "/api/futures/openInterest/ohlc-history", {"symbol": symbol, "interval": interval, "limit": limit} ) if len(data) < 2: return {} latest = data[-1] previous = data[-2] change = (latest["c"] - previous["c"]) / previous["c"] * 100 if previous["c"] else 0 return { "current_oi_usd": latest["c"], "previous_oi_usd": previous["c"], "change_pct": round(change, 3), "period_high_usd": latest["h"], "period_low_usd": latest["l"], "timestamp_ms": latest["t"] } # ── Liquidations ────────────────────────────────────────────────────────────── def get_liquidation_history(symbol: str = "BTC", interval: str = "h1", limit: int = 1) -> dict: """ Fetch aggregated liquidation history (longs + shorts). Endpoint: GET /api/futures/liquidation/history Params: symbol (str): Asset symbol interval (str): Candle size — m5 m15 m30 h1 h4 h8 h24 limit (int): Number of candles Response fields per item: t — timestamp (ms) longLiqUsd — Long liquidations in USD shortLiqUsd — Short liquidations in USD """ data = _get( "/api/futures/liquidation/history", {"symbol": symbol, "interval": interval, "limit": limit} ) if not data: return {} latest = data[-1] long_liq = float(latest.get("longLiqUsd", 0)) short_liq = float(latest.get("shortLiqUsd", 0)) total = long_liq + short_liq return { "long_liq_usd": long_liq, "short_liq_usd": short_liq, "total_liq_usd": total, "dominant_side": "longs" if long_liq > short_liq else "shorts", "imbalance_ratio": round(max(long_liq, short_liq) / min(long_liq, short_liq), 2) if min(long_liq, short_liq) > 0 else 0, "timestamp_ms": latest.get("t") } # ── Long / Short Ratios ─────────────────────────────────────────────────────── def get_global_long_short_ratio(symbol: str = "BTC", interval: str = "h1", limit: int = 1) -> dict: """ Fetch global (all accounts) long/short account ratio history. Endpoint: GET /api/futures/global-long-short-account-ratio/history Params: symbol (str): Asset symbol interval (str): Candle size — m5 m15 m30 h1 h4 h8 h24 limit (int): Number of candles Response fields per item: t — timestamp (ms) longRatio — Fraction of accounts long (0 to 1) shortRatio — Fraction of accounts short (0 to 1) """ data = _get( "/api/futures/global-long-short-account-ratio/history", {"symbol": symbol, "interval": interval, "limit": limit} ) if not data: return {} latest = data[-1] return { "long_pct": round(float(latest.get("longRatio", 0)) * 100, 2), "short_pct": round(float(latest.get("shortRatio", 0)) * 100, 2), "timestamp_ms": latest.get("t") } def get_top_trader_long_short_ratio(symbol: str = "BTC", interval: str = "h1", limit: int = 1) -> dict: """ Fetch top trader (large accounts) long/short account ratio history. Endpoint: GET /api/futures/top-long-short-account-ratio/history Params: identical to global ratio endpoint above Response fields: identical to global ratio endpoint above """ data = _get( "/api/futures/top-long-short-account-ratio/history", {"symbol": symbol, "interval": interval, "limit": limit} ) if not data: return {} latest = data[-1] return { "long_pct": round(float(latest.get("longRatio", 0)) * 100, 2), "short_pct": round(float(latest.get("shortRatio", 0)) * 100, 2), "timestamp_ms": latest.get("t") } def get_taker_buy_sell_volume(symbol: str = "BTC", interval: str = "h1", limit: int = 1) -> dict: """ Fetch taker (market order) buy vs. sell volume history. Endpoint: GET /api/futures/taker-buy-sell-volume/history Params: symbol (str): Asset symbol interval (str): Candle size — m5 m15 m30 h1 h4 h8 h24 limit (int): Number of candles Response fields per item: t — timestamp (ms) buyVol — Taker buy volume in USD sellVol — Taker sell volume in USD """ data = _get( "/api/futures/taker-buy-sell-volume/history", {"symbol": symbol, "interval": interval, "limit": limit} ) if not data: return {} latest = data[-1] buy_vol = float(latest.get("buyVol", 0)) sell_vol = float(latest.get("sellVol", 0)) total = buy_vol + sell_vol return { "buy_vol_usd": buy_vol, "sell_vol_usd": sell_vol, "buy_pct": round(buy_vol / total * 100, 2) if total else 50.0, "sell_pct": round(sell_vol / total * 100, 2) if total else 50.0, "bias": "buy-side" if buy_vol > sell_vol else "sell-side", "timestamp_ms": latest.get("t") } # ── Composite Data Package ──────────────────────────────────────────────────── def build_market_snapshot(symbol: str = "BTC") -> dict: """ Assemble a complete derivatives market snapshot for one symbol. Calls all five endpoints and returns a unified dict. """ logger.info(f"Fetching snapshot for {symbol}...") return { "symbol": symbol, "timestamp": datetime.utcnow().isoformat() + "Z", "funding_rate": get_funding_rate_by_exchange(symbol), "open_interest": get_open_interest_history(symbol), "liquidation": get_liquidation_history(symbol), "global_ls": get_global_long_short_ratio(symbol), "top_trader_ls": get_top_trader_long_short_ratio(symbol), "taker_volume": get_taker_buy_sell_volume(symbol), }
# signal_engine.py
from dataclasses import dataclass, field
from typing import List
from config import ( FUNDING_RATE_HIGH, FUNDING_RATE_LOW, OI_CHANGE_THRESHOLD, LIQ_THRESHOLD_USD, LONG_RATIO_HIGH, SHORT_RATIO_HIGH, TOP_DIVERGENCE_GAP
) @dataclass
class Signal: level: str # "HIGH" | "MEDIUM" | "LOW" title: str message: str emoji: str = "⚪" def __post_init__(self): self.emoji = {"HIGH": "🔴", "MEDIUM": "🟡", "LOW": "🟢"}.get(self.level, "⚪") @dataclass
class MarketSignals: symbol: str signals: List[Signal] = field(default_factory=list) max_level: str = "NONE" def add(self, signal: Signal): self.signals.append(signal) priority = {"HIGH": 3, "MEDIUM": 2, "LOW": 1, "NONE": 0} if priority.get(signal.level, 0) > priority.get(self.max_level, 0): self.max_level = signal.level @property def has_alerts(self) -> bool: return len(self.signals) > 0 @property def top_emoji(self) -> str: return {"HIGH": "🔴", "MEDIUM": "🟡", "LOW": "🟢", "NONE": "⚪"}.get(self.max_level, "⚪") def analyze_snapshot(snapshot: dict) -> MarketSignals: """ Run all signal checks against a market snapshot. Returns a MarketSignals object containing all triggered signals. """ symbol = snapshot.get("symbol", "???") result = MarketSignals(symbol=symbol) fr = snapshot.get("funding_rate", {}) oi = snapshot.get("open_interest", {}) liq = snapshot.get("liquidation", {}) gls = snapshot.get("global_ls", {}) tls = snapshot.get("top_trader_ls", {}) tkv = snapshot.get("taker_volume", {}) # ── Signal 1: Funding Rate Extremes ────────────────────────────────────── avg_fr = fr.get("avg_funding_rate_pct", 0) if avg_fr > FUNDING_RATE_HIGH: result.add(Signal( level = "HIGH", title = "Extreme Positive Funding Rate", message = ( f"Average funding rate across {fr.get('exchange_count', '?')} exchanges " f"is **{avg_fr:.4f}%** — well above the {FUNDING_RATE_HIGH}% danger threshold. " f"Longs are paying an unsustainable premium. Historical mean-reversion " f"probability within 48h is elevated." ) )) elif avg_fr < FUNDING_RATE_LOW: result.add(Signal( level = "HIGH", title = "Extreme Negative Funding Rate", message = ( f"Average funding rate is **{avg_fr:.4f}%** — below {FUNDING_RATE_LOW}%. " f"Shorts are paying heavily to maintain positions. " f"Short squeeze conditions are forming." ) )) elif abs(avg_fr) > FUNDING_RATE_HIGH * 0.6: result.add(Signal( level = "MEDIUM", title = "Elevated Funding Rate", message = f"Funding rate at {avg_fr:.4f}% — approaching extreme territory. Monitor closely." )) # ── Signal 2: Open Interest Surge ──────────────────────────────────────── oi_change = oi.get("change_pct", 0) if abs(oi_change) > OI_CHANGE_THRESHOLD: direction = "surged" if oi_change > 0 else "dropped" side = "new leveraged positions entering" if oi_change > 0 else "mass position closure / deleveraging" result.add(Signal( level = "HIGH", title = f"Open Interest {direction.title()} {abs(oi_change):.1f}%", message = ( f"OI changed by **{oi_change:+.2f}%** in the last hour " f"(${oi.get('current_oi_usd', 0):,.0f} total). " f"This signals {side}. " f"A directional breakout or breakdown is likely imminent." ) )) elif abs(oi_change) > OI_CHANGE_THRESHOLD * 0.5: result.add(Signal( level = "MEDIUM", title = f"Open Interest Moving: {oi_change:+.1f}%", message = f"OI trending {'up' if oi_change > 0 else 'down'}. Watch for continuation." )) # ── Signal 3: Liquidation Cascade ──────────────────────────────────────── long_liq = liq.get("long_liq_usd", 0) short_liq = liq.get("short_liq_usd", 0) total_liq = liq.get("total_liq_usd", 0) if long_liq > LIQ_THRESHOLD_USD: result.add(Signal( level = "HIGH", title = f"Long Liquidation Cascade: ${long_liq/1e6:.1f}M", message = ( f"**${long_liq/1e6:.1f}M** in long positions were liquidated in the past hour. " f"Cascade risk: additional ${short_liq/1e6:.1f}M in shorts still open. " f"Forced selling pressure may continue." ) )) elif short_liq > LIQ_THRESHOLD_USD: result.add(Signal( level = "HIGH", title = f"Short Liquidation Cascade: ${short_liq/1e6:.1f}M", message = ( f"**${short_liq/1e6:.1f}M** in short positions were liquidated in the past hour. " f"Short squeeze may be in progress — forced buybacks adding upward pressure." ) )) elif total_liq > LIQ_THRESHOLD_USD * 0.5: result.add(Signal( level = "MEDIUM", title = f"Elevated Liquidations: ${total_liq/1e6:.1f}M", message = f"Total liquidations ${total_liq/1e6:.1f}M in past hour. Volatility is elevated." )) # ── Signal 4: Long/Short Ratio Extreme ─────────────────────────────────── global_long = gls.get("long_pct", 50) global_short = gls.get("short_pct", 50) if global_long > LONG_RATIO_HIGH: result.add(Signal( level = "HIGH", title = f"Retail Extremely Long: {global_long:.1f}%", message = ( f"**{global_long:.1f}%** of all accounts are long. " f"When retail sentiment reaches this extreme, contrarian moves are historically common. " f"Avoid adding long leverage here." ) )) elif global_short > SHORT_RATIO_HIGH: result.add(Signal( level = "MEDIUM", title = f"Retail Heavily Short: {global_short:.1f}%", message = ( f"**{global_short:.1f}%** of accounts are short. " f"Extreme short positioning often precedes short squeezes. " f"Monitor for bullish catalysts." ) )) # ── Signal 5: Smart Money Divergence (most valuable signal) ────────────── top_long = tls.get("long_pct", 50) top_short = tls.get("short_pct", 50) divergence = abs(top_long - global_long) if divergence >= TOP_DIVERGENCE_GAP: if top_long > global_long: result.add(Signal( level = "HIGH", title = f"Smart Money Divergence — Top Traders Long vs Retail Short", message = ( f"Top traders: **{top_long:.1f}% long** | Retail: **{global_long:.1f}% long**. " f"Gap of {divergence:.1f}%. " f"Large accounts are positioning opposite to retail — historically the highest-value setup. " f"Potential accumulation in progress." ) )) else: result.add(Signal( level = "HIGH", title = f"Smart Money Divergence — Top Traders Short vs Retail Long", message = ( f"Top traders: **{top_short:.1f}% short** | Retail: **{global_long:.1f}% long**. " f"Gap of {divergence:.1f}%. " f"Large accounts are fading retail longs — historically a distribution signal." ) )) # ── Signal 6: Taker Volume Bias ────────────────────────────────────────── buy_pct = tkv.get("buy_pct", 50) if buy_pct > 65: result.add(Signal( level = "MEDIUM", title = f"Strong Taker Buy Bias: {buy_pct:.1f}%", message = f"Market orders are {buy_pct:.1f}% buys. Aggressive buying pressure detected." )) elif buy_pct < 35: result.add(Signal( level = "MEDIUM", title = f"Strong Taker Sell Bias: {100-buy_pct:.1f}%", message = f"Market orders are {100-buy_pct:.1f}% sells. Aggressive selling pressure detected." )) return result
# signal_engine.py
from dataclasses import dataclass, field
from typing import List
from config import ( FUNDING_RATE_HIGH, FUNDING_RATE_LOW, OI_CHANGE_THRESHOLD, LIQ_THRESHOLD_USD, LONG_RATIO_HIGH, SHORT_RATIO_HIGH, TOP_DIVERGENCE_GAP
) @dataclass
class Signal: level: str # "HIGH" | "MEDIUM" | "LOW" title: str message: str emoji: str = "⚪" def __post_init__(self): self.emoji = {"HIGH": "🔴", "MEDIUM": "🟡", "LOW": "🟢"}.get(self.level, "⚪") @dataclass
class MarketSignals: symbol: str signals: List[Signal] = field(default_factory=list) max_level: str = "NONE" def add(self, signal: Signal): self.signals.append(signal) priority = {"HIGH": 3, "MEDIUM": 2, "LOW": 1, "NONE": 0} if priority.get(signal.level, 0) > priority.get(self.max_level, 0): self.max_level = signal.level @property def has_alerts(self) -> bool: return len(self.signals) > 0 @property def top_emoji(self) -> str: return {"HIGH": "🔴", "MEDIUM": "🟡", "LOW": "🟢", "NONE": "⚪"}.get(self.max_level, "⚪") def analyze_snapshot(snapshot: dict) -> MarketSignals: """ Run all signal checks against a market snapshot. Returns a MarketSignals object containing all triggered signals. """ symbol = snapshot.get("symbol", "???") result = MarketSignals(symbol=symbol) fr = snapshot.get("funding_rate", {}) oi = snapshot.get("open_interest", {}) liq = snapshot.get("liquidation", {}) gls = snapshot.get("global_ls", {}) tls = snapshot.get("top_trader_ls", {}) tkv = snapshot.get("taker_volume", {}) # ── Signal 1: Funding Rate Extremes ────────────────────────────────────── avg_fr = fr.get("avg_funding_rate_pct", 0) if avg_fr > FUNDING_RATE_HIGH: result.add(Signal( level = "HIGH", title = "Extreme Positive Funding Rate", message = ( f"Average funding rate across {fr.get('exchange_count', '?')} exchanges " f"is **{avg_fr:.4f}%** — well above the {FUNDING_RATE_HIGH}% danger threshold. " f"Longs are paying an unsustainable premium. Historical mean-reversion " f"probability within 48h is elevated." ) )) elif avg_fr < FUNDING_RATE_LOW: result.add(Signal( level = "HIGH", title = "Extreme Negative Funding Rate", message = ( f"Average funding rate is **{avg_fr:.4f}%** — below {FUNDING_RATE_LOW}%. " f"Shorts are paying heavily to maintain positions. " f"Short squeeze conditions are forming." ) )) elif abs(avg_fr) > FUNDING_RATE_HIGH * 0.6: result.add(Signal( level = "MEDIUM", title = "Elevated Funding Rate", message = f"Funding rate at {avg_fr:.4f}% — approaching extreme territory. Monitor closely." )) # ── Signal 2: Open Interest Surge ──────────────────────────────────────── oi_change = oi.get("change_pct", 0) if abs(oi_change) > OI_CHANGE_THRESHOLD: direction = "surged" if oi_change > 0 else "dropped" side = "new leveraged positions entering" if oi_change > 0 else "mass position closure / deleveraging" result.add(Signal( level = "HIGH", title = f"Open Interest {direction.title()} {abs(oi_change):.1f}%", message = ( f"OI changed by **{oi_change:+.2f}%** in the last hour " f"(${oi.get('current_oi_usd', 0):,.0f} total). " f"This signals {side}. " f"A directional breakout or breakdown is likely imminent." ) )) elif abs(oi_change) > OI_CHANGE_THRESHOLD * 0.5: result.add(Signal( level = "MEDIUM", title = f"Open Interest Moving: {oi_change:+.1f}%", message = f"OI trending {'up' if oi_change > 0 else 'down'}. Watch for continuation." )) # ── Signal 3: Liquidation Cascade ──────────────────────────────────────── long_liq = liq.get("long_liq_usd", 0) short_liq = liq.get("short_liq_usd", 0) total_liq = liq.get("total_liq_usd", 0) if long_liq > LIQ_THRESHOLD_USD: result.add(Signal( level = "HIGH", title = f"Long Liquidation Cascade: ${long_liq/1e6:.1f}M", message = ( f"**${long_liq/1e6:.1f}M** in long positions were liquidated in the past hour. " f"Cascade risk: additional ${short_liq/1e6:.1f}M in shorts still open. " f"Forced selling pressure may continue." ) )) elif short_liq > LIQ_THRESHOLD_USD: result.add(Signal( level = "HIGH", title = f"Short Liquidation Cascade: ${short_liq/1e6:.1f}M", message = ( f"**${short_liq/1e6:.1f}M** in short positions were liquidated in the past hour. " f"Short squeeze may be in progress — forced buybacks adding upward pressure." ) )) elif total_liq > LIQ_THRESHOLD_USD * 0.5: result.add(Signal( level = "MEDIUM", title = f"Elevated Liquidations: ${total_liq/1e6:.1f}M", message = f"Total liquidations ${total_liq/1e6:.1f}M in past hour. Volatility is elevated." )) # ── Signal 4: Long/Short Ratio Extreme ─────────────────────────────────── global_long = gls.get("long_pct", 50) global_short = gls.get("short_pct", 50) if global_long > LONG_RATIO_HIGH: result.add(Signal( level = "HIGH", title = f"Retail Extremely Long: {global_long:.1f}%", message = ( f"**{global_long:.1f}%** of all accounts are long. " f"When retail sentiment reaches this extreme, contrarian moves are historically common. " f"Avoid adding long leverage here." ) )) elif global_short > SHORT_RATIO_HIGH: result.add(Signal( level = "MEDIUM", title = f"Retail Heavily Short: {global_short:.1f}%", message = ( f"**{global_short:.1f}%** of accounts are short. " f"Extreme short positioning often precedes short squeezes. " f"Monitor for bullish catalysts." ) )) # ── Signal 5: Smart Money Divergence (most valuable signal) ────────────── top_long = tls.get("long_pct", 50) top_short = tls.get("short_pct", 50) divergence = abs(top_long - global_long) if divergence >= TOP_DIVERGENCE_GAP: if top_long > global_long: result.add(Signal( level = "HIGH", title = f"Smart Money Divergence — Top Traders Long vs Retail Short", message = ( f"Top traders: **{top_long:.1f}% long** | Retail: **{global_long:.1f}% long**. " f"Gap of {divergence:.1f}%. " f"Large accounts are positioning opposite to retail — historically the highest-value setup. " f"Potential accumulation in progress." ) )) else: result.add(Signal( level = "HIGH", title = f"Smart Money Divergence — Top Traders Short vs Retail Long", message = ( f"Top traders: **{top_short:.1f}% short** | Retail: **{global_long:.1f}% long**. " f"Gap of {divergence:.1f}%. " f"Large accounts are fading retail longs — historically a distribution signal." ) )) # ── Signal 6: Taker Volume Bias ────────────────────────────────────────── buy_pct = tkv.get("buy_pct", 50) if buy_pct > 65: result.add(Signal( level = "MEDIUM", title = f"Strong Taker Buy Bias: {buy_pct:.1f}%", message = f"Market orders are {buy_pct:.1f}% buys. Aggressive buying pressure detected." )) elif buy_pct < 35: result.add(Signal( level = "MEDIUM", title = f"Strong Taker Sell Bias: {100-buy_pct:.1f}%", message = f"Market orders are {100-buy_pct:.1f}% sells. Aggressive selling pressure detected." )) return result
# signal_engine.py
from dataclasses import dataclass, field
from typing import List
from config import ( FUNDING_RATE_HIGH, FUNDING_RATE_LOW, OI_CHANGE_THRESHOLD, LIQ_THRESHOLD_USD, LONG_RATIO_HIGH, SHORT_RATIO_HIGH, TOP_DIVERGENCE_GAP
) @dataclass
class Signal: level: str # "HIGH" | "MEDIUM" | "LOW" title: str message: str emoji: str = "⚪" def __post_init__(self): self.emoji = {"HIGH": "🔴", "MEDIUM": "🟡", "LOW": "🟢"}.get(self.level, "⚪") @dataclass
class MarketSignals: symbol: str signals: List[Signal] = field(default_factory=list) max_level: str = "NONE" def add(self, signal: Signal): self.signals.append(signal) priority = {"HIGH": 3, "MEDIUM": 2, "LOW": 1, "NONE": 0} if priority.get(signal.level, 0) > priority.get(self.max_level, 0): self.max_level = signal.level @property def has_alerts(self) -> bool: return len(self.signals) > 0 @property def top_emoji(self) -> str: return {"HIGH": "🔴", "MEDIUM": "🟡", "LOW": "🟢", "NONE": "⚪"}.get(self.max_level, "⚪") def analyze_snapshot(snapshot: dict) -> MarketSignals: """ Run all signal checks against a market snapshot. Returns a MarketSignals object containing all triggered signals. """ symbol = snapshot.get("symbol", "???") result = MarketSignals(symbol=symbol) fr = snapshot.get("funding_rate", {}) oi = snapshot.get("open_interest", {}) liq = snapshot.get("liquidation", {}) gls = snapshot.get("global_ls", {}) tls = snapshot.get("top_trader_ls", {}) tkv = snapshot.get("taker_volume", {}) # ── Signal 1: Funding Rate Extremes ────────────────────────────────────── avg_fr = fr.get("avg_funding_rate_pct", 0) if avg_fr > FUNDING_RATE_HIGH: result.add(Signal( level = "HIGH", title = "Extreme Positive Funding Rate", message = ( f"Average funding rate across {fr.get('exchange_count', '?')} exchanges " f"is **{avg_fr:.4f}%** — well above the {FUNDING_RATE_HIGH}% danger threshold. " f"Longs are paying an unsustainable premium. Historical mean-reversion " f"probability within 48h is elevated." ) )) elif avg_fr < FUNDING_RATE_LOW: result.add(Signal( level = "HIGH", title = "Extreme Negative Funding Rate", message = ( f"Average funding rate is **{avg_fr:.4f}%** — below {FUNDING_RATE_LOW}%. " f"Shorts are paying heavily to maintain positions. " f"Short squeeze conditions are forming." ) )) elif abs(avg_fr) > FUNDING_RATE_HIGH * 0.6: result.add(Signal( level = "MEDIUM", title = "Elevated Funding Rate", message = f"Funding rate at {avg_fr:.4f}% — approaching extreme territory. Monitor closely." )) # ── Signal 2: Open Interest Surge ──────────────────────────────────────── oi_change = oi.get("change_pct", 0) if abs(oi_change) > OI_CHANGE_THRESHOLD: direction = "surged" if oi_change > 0 else "dropped" side = "new leveraged positions entering" if oi_change > 0 else "mass position closure / deleveraging" result.add(Signal( level = "HIGH", title = f"Open Interest {direction.title()} {abs(oi_change):.1f}%", message = ( f"OI changed by **{oi_change:+.2f}%** in the last hour " f"(${oi.get('current_oi_usd', 0):,.0f} total). " f"This signals {side}. " f"A directional breakout or breakdown is likely imminent." ) )) elif abs(oi_change) > OI_CHANGE_THRESHOLD * 0.5: result.add(Signal( level = "MEDIUM", title = f"Open Interest Moving: {oi_change:+.1f}%", message = f"OI trending {'up' if oi_change > 0 else 'down'}. Watch for continuation." )) # ── Signal 3: Liquidation Cascade ──────────────────────────────────────── long_liq = liq.get("long_liq_usd", 0) short_liq = liq.get("short_liq_usd", 0) total_liq = liq.get("total_liq_usd", 0) if long_liq > LIQ_THRESHOLD_USD: result.add(Signal( level = "HIGH", title = f"Long Liquidation Cascade: ${long_liq/1e6:.1f}M", message = ( f"**${long_liq/1e6:.1f}M** in long positions were liquidated in the past hour. " f"Cascade risk: additional ${short_liq/1e6:.1f}M in shorts still open. " f"Forced selling pressure may continue." ) )) elif short_liq > LIQ_THRESHOLD_USD: result.add(Signal( level = "HIGH", title = f"Short Liquidation Cascade: ${short_liq/1e6:.1f}M", message = ( f"**${short_liq/1e6:.1f}M** in short positions were liquidated in the past hour. " f"Short squeeze may be in progress — forced buybacks adding upward pressure." ) )) elif total_liq > LIQ_THRESHOLD_USD * 0.5: result.add(Signal( level = "MEDIUM", title = f"Elevated Liquidations: ${total_liq/1e6:.1f}M", message = f"Total liquidations ${total_liq/1e6:.1f}M in past hour. Volatility is elevated." )) # ── Signal 4: Long/Short Ratio Extreme ─────────────────────────────────── global_long = gls.get("long_pct", 50) global_short = gls.get("short_pct", 50) if global_long > LONG_RATIO_HIGH: result.add(Signal( level = "HIGH", title = f"Retail Extremely Long: {global_long:.1f}%", message = ( f"**{global_long:.1f}%** of all accounts are long. " f"When retail sentiment reaches this extreme, contrarian moves are historically common. " f"Avoid adding long leverage here." ) )) elif global_short > SHORT_RATIO_HIGH: result.add(Signal( level = "MEDIUM", title = f"Retail Heavily Short: {global_short:.1f}%", message = ( f"**{global_short:.1f}%** of accounts are short. " f"Extreme short positioning often precedes short squeezes. " f"Monitor for bullish catalysts." ) )) # ── Signal 5: Smart Money Divergence (most valuable signal) ────────────── top_long = tls.get("long_pct", 50) top_short = tls.get("short_pct", 50) divergence = abs(top_long - global_long) if divergence >= TOP_DIVERGENCE_GAP: if top_long > global_long: result.add(Signal( level = "HIGH", title = f"Smart Money Divergence — Top Traders Long vs Retail Short", message = ( f"Top traders: **{top_long:.1f}% long** | Retail: **{global_long:.1f}% long**. " f"Gap of {divergence:.1f}%. " f"Large accounts are positioning opposite to retail — historically the highest-value setup. " f"Potential accumulation in progress." ) )) else: result.add(Signal( level = "HIGH", title = f"Smart Money Divergence — Top Traders Short vs Retail Long", message = ( f"Top traders: **{top_short:.1f}% short** | Retail: **{global_long:.1f}% long**. " f"Gap of {divergence:.1f}%. " f"Large accounts are fading retail longs — historically a distribution signal." ) )) # ── Signal 6: Taker Volume Bias ────────────────────────────────────────── buy_pct = tkv.get("buy_pct", 50) if buy_pct > 65: result.add(Signal( level = "MEDIUM", title = f"Strong Taker Buy Bias: {buy_pct:.1f}%", message = f"Market orders are {buy_pct:.1f}% buys. Aggressive buying pressure detected." )) elif buy_pct < 35: result.add(Signal( level = "MEDIUM", title = f"Strong Taker Sell Bias: {100-buy_pct:.1f}%", message = f"Market orders are {100-buy_pct:.1f}% sells. Aggressive selling pressure detected." )) return result
# alert_layer.py
import requests
import logging
from datetime import datetime
from config import TELEGRAM_TOKEN, TELEGRAM_CHAT_ID logger = logging.getLogger(__name__) def format_alert_message(snapshot: dict, signals) -> str: """Format a structured Telegram message from snapshot + signals.""" symbol = snapshot.get("symbol", "???") ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC") fr = snapshot.get("funding_rate", {}) oi = snapshot.get("open_interest", {}) liq = snapshot.get("liquidation", {}) gls = snapshot.get("global_ls", {}) tls = snapshot.get("top_trader_ls", {}) # Header lines = [ f"{signals.top_emoji} *{symbol} Derivatives Alert*", f"`{ts}`", "", "📊 *Market Snapshot*", f"├ Funding Rate (avg): `{fr.get('avg_funding_rate_pct', 'N/A'):.4f}%`", f"├ Open Interest: `${oi.get('current_oi_usd', 0):,.0f}` ({oi.get('change_pct', 0):+.2f}% 1H)", f"├ Liquidations (1H): `${liq.get('total_liq_usd', 0):,.0f}` ({liq.get('dominant_side','?')} dominated)", f"├ Retail L/S: `{gls.get('long_pct','?')}% long / {gls.get('short_pct','?')}% short`", f"└ Top Traders L/S: `{tls.get('long_pct','?')}% long / {tls.get('short_pct','?')}% short`", "", f"🚨 *Signals ({signals.max_level})*", ] # Signals for s in signals.signals: lines.append(f"\n{s.emoji} *{s.title}*") lines.append(s.message) lines += [ "", "_Data: CoinGlass API V4 | Not financial advice_" ] return "\n".join(lines) def send_telegram(message: str) -> bool: """Send a message to the configured Telegram chat.""" if not TELEGRAM_TOKEN or not TELEGRAM_CHAT_ID: logger.warning("Telegram credentials not configured. Skipping push.") return False try: resp = requests.post( f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage", json={ "chat_id": TELEGRAM_CHAT_ID, "text": message, "parse_mode": "Markdown" }, timeout=10 ) resp.raise_for_status() logger.info("Telegram alert sent successfully.") return True except Exception as e: logger.error(f"Failed to send Telegram alert: {e}") return False
# alert_layer.py
import requests
import logging
from datetime import datetime
from config import TELEGRAM_TOKEN, TELEGRAM_CHAT_ID logger = logging.getLogger(__name__) def format_alert_message(snapshot: dict, signals) -> str: """Format a structured Telegram message from snapshot + signals.""" symbol = snapshot.get("symbol", "???") ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC") fr = snapshot.get("funding_rate", {}) oi = snapshot.get("open_interest", {}) liq = snapshot.get("liquidation", {}) gls = snapshot.get("global_ls", {}) tls = snapshot.get("top_trader_ls", {}) # Header lines = [ f"{signals.top_emoji} *{symbol} Derivatives Alert*", f"`{ts}`", "", "📊 *Market Snapshot*", f"├ Funding Rate (avg): `{fr.get('avg_funding_rate_pct', 'N/A'):.4f}%`", f"├ Open Interest: `${oi.get('current_oi_usd', 0):,.0f}` ({oi.get('change_pct', 0):+.2f}% 1H)", f"├ Liquidations (1H): `${liq.get('total_liq_usd', 0):,.0f}` ({liq.get('dominant_side','?')} dominated)", f"├ Retail L/S: `{gls.get('long_pct','?')}% long / {gls.get('short_pct','?')}% short`", f"└ Top Traders L/S: `{tls.get('long_pct','?')}% long / {tls.get('short_pct','?')}% short`", "", f"🚨 *Signals ({signals.max_level})*", ] # Signals for s in signals.signals: lines.append(f"\n{s.emoji} *{s.title}*") lines.append(s.message) lines += [ "", "_Data: CoinGlass API V4 | Not financial advice_" ] return "\n".join(lines) def send_telegram(message: str) -> bool: """Send a message to the configured Telegram chat.""" if not TELEGRAM_TOKEN or not TELEGRAM_CHAT_ID: logger.warning("Telegram credentials not configured. Skipping push.") return False try: resp = requests.post( f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage", json={ "chat_id": TELEGRAM_CHAT_ID, "text": message, "parse_mode": "Markdown" }, timeout=10 ) resp.raise_for_status() logger.info("Telegram alert sent successfully.") return True except Exception as e: logger.error(f"Failed to send Telegram alert: {e}") return False
# alert_layer.py
import requests
import logging
from datetime import datetime
from config import TELEGRAM_TOKEN, TELEGRAM_CHAT_ID logger = logging.getLogger(__name__) def format_alert_message(snapshot: dict, signals) -> str: """Format a structured Telegram message from snapshot + signals.""" symbol = snapshot.get("symbol", "???") ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC") fr = snapshot.get("funding_rate", {}) oi = snapshot.get("open_interest", {}) liq = snapshot.get("liquidation", {}) gls = snapshot.get("global_ls", {}) tls = snapshot.get("top_trader_ls", {}) # Header lines = [ f"{signals.top_emoji} *{symbol} Derivatives Alert*", f"`{ts}`", "", "📊 *Market Snapshot*", f"├ Funding Rate (avg): `{fr.get('avg_funding_rate_pct', 'N/A'):.4f}%`", f"├ Open Interest: `${oi.get('current_oi_usd', 0):,.0f}` ({oi.get('change_pct', 0):+.2f}% 1H)", f"├ Liquidations (1H): `${liq.get('total_liq_usd', 0):,.0f}` ({liq.get('dominant_side','?')} dominated)", f"├ Retail L/S: `{gls.get('long_pct','?')}% long / {gls.get('short_pct','?')}% short`", f"└ Top Traders L/S: `{tls.get('long_pct','?')}% long / {tls.get('short_pct','?')}% short`", "", f"🚨 *Signals ({signals.max_level})*", ] # Signals for s in signals.signals: lines.append(f"\n{s.emoji} *{s.title}*") lines.append(s.message) lines += [ "", "_Data: CoinGlass API V4 | Not financial advice_" ] return "\n".join(lines) def send_telegram(message: str) -> bool: """Send a message to the configured Telegram chat.""" if not TELEGRAM_TOKEN or not TELEGRAM_CHAT_ID: logger.warning("Telegram credentials not configured. Skipping push.") return False try: resp = requests.post( f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage", json={ "chat_id": TELEGRAM_CHAT_ID, "text": message, "parse_mode": "Markdown" }, timeout=10 ) resp.raise_for_status() logger.info("Telegram alert sent successfully.") return True except Exception as e: logger.error(f"Failed to send Telegram alert: {e}") return False
# monitor.py
import schedule
import time
import logging
from datetime import datetime
from config import SYMBOLS, POLL_INTERVAL
from data_layer import build_market_snapshot
from signal_engine import analyze_snapshot
from alert_layer import format_alert_message, send_telegram logging.basicConfig( level = logging.INFO, format = "%(asctime)s [%(levelname)s] %(message)s", handlers = [ logging.FileHandler("monitor.log", encoding="utf-8"), logging.StreamHandler() ]
)
logger = logging.getLogger(__name__) # Cooldown tracking — prevent duplicate alerts
_last_alert: dict = {}
COOLDOWN_SECONDS = { "HIGH": 1800, # 30 min cooldown for high alerts "MEDIUM": 3600, # 1 hour cooldown for medium alerts "LOW": 21600, # 6 hour cooldown for low alerts
} def _should_alert(symbol: str, level: str) -> bool: key = f"{symbol}_{level}" now = datetime.utcnow() if key not in _last_alert: return True elapsed = (now - _last_alert[key]).total_seconds() return elapsed >= COOLDOWN_SECONDS.get(level, 3600) def run_cycle(symbol: str) -> None: """Execute one full monitoring cycle for a symbol.""" try: # 1. Fetch data snapshot = build_market_snapshot(symbol) # 2. Analyze signals = analyze_snapshot(snapshot) # 3. Log logger.info( f"{symbol} | Level: {signals.max_level} | " f"Signals: {len(signals.signals)} | " f"FR: {snapshot['funding_rate'].get('avg_funding_rate_pct', 'N/A'):.4f}% | " f"OI chg: {snapshot['open_interest'].get('change_pct', 'N/A'):+.2f}%" ) # 4. Alert if warranted if signals.has_alerts and _should_alert(symbol, signals.max_level): message = format_alert_message(snapshot, signals) if send_telegram(message): _last_alert[f"{symbol}_{signals.max_level}"] = datetime.utcnow() except Exception as e: logger.error(f"Cycle failed for {symbol}: {e}", exc_info=True) send_telegram(f"⚠️ Monitor error for {symbol}: {str(e)[:100]}") def main(): logger.info("=" * 60) logger.info(" CoinGlass Derivatives Monitor — Starting Up") logger.info(f" Watching: {', '.join(SYMBOLS)}") logger.info(f" Poll interval: every {POLL_INTERVAL} minutes") logger.info("=" * 60) # Run immediately on startup for symbol in SYMBOLS: run_cycle(symbol) # Schedule recurring runs for symbol in SYMBOLS: schedule.every(POLL_INTERVAL).minutes.do(run_cycle, symbol=symbol) while True: schedule.run_pending() time.sleep(30) if __name__ == "__main__": main()
# monitor.py
import schedule
import time
import logging
from datetime import datetime
from config import SYMBOLS, POLL_INTERVAL
from data_layer import build_market_snapshot
from signal_engine import analyze_snapshot
from alert_layer import format_alert_message, send_telegram logging.basicConfig( level = logging.INFO, format = "%(asctime)s [%(levelname)s] %(message)s", handlers = [ logging.FileHandler("monitor.log", encoding="utf-8"), logging.StreamHandler() ]
)
logger = logging.getLogger(__name__) # Cooldown tracking — prevent duplicate alerts
_last_alert: dict = {}
COOLDOWN_SECONDS = { "HIGH": 1800, # 30 min cooldown for high alerts "MEDIUM": 3600, # 1 hour cooldown for medium alerts "LOW": 21600, # 6 hour cooldown for low alerts
} def _should_alert(symbol: str, level: str) -> bool: key = f"{symbol}_{level}" now = datetime.utcnow() if key not in _last_alert: return True elapsed = (now - _last_alert[key]).total_seconds() return elapsed >= COOLDOWN_SECONDS.get(level, 3600) def run_cycle(symbol: str) -> None: """Execute one full monitoring cycle for a symbol.""" try: # 1. Fetch data snapshot = build_market_snapshot(symbol) # 2. Analyze signals = analyze_snapshot(snapshot) # 3. Log logger.info( f"{symbol} | Level: {signals.max_level} | " f"Signals: {len(signals.signals)} | " f"FR: {snapshot['funding_rate'].get('avg_funding_rate_pct', 'N/A'):.4f}% | " f"OI chg: {snapshot['open_interest'].get('change_pct', 'N/A'):+.2f}%" ) # 4. Alert if warranted if signals.has_alerts and _should_alert(symbol, signals.max_level): message = format_alert_message(snapshot, signals) if send_telegram(message): _last_alert[f"{symbol}_{signals.max_level}"] = datetime.utcnow() except Exception as e: logger.error(f"Cycle failed for {symbol}: {e}", exc_info=True) send_telegram(f"⚠️ Monitor error for {symbol}: {str(e)[:100]}") def main(): logger.info("=" * 60) logger.info(" CoinGlass Derivatives Monitor — Starting Up") logger.info(f" Watching: {', '.join(SYMBOLS)}") logger.info(f" Poll interval: every {POLL_INTERVAL} minutes") logger.info("=" * 60) # Run immediately on startup for symbol in SYMBOLS: run_cycle(symbol) # Schedule recurring runs for symbol in SYMBOLS: schedule.every(POLL_INTERVAL).minutes.do(run_cycle, symbol=symbol) while True: schedule.run_pending() time.sleep(30) if __name__ == "__main__": main()
# monitor.py
import schedule
import time
import logging
from datetime import datetime
from config import SYMBOLS, POLL_INTERVAL
from data_layer import build_market_snapshot
from signal_engine import analyze_snapshot
from alert_layer import format_alert_message, send_telegram logging.basicConfig( level = logging.INFO, format = "%(asctime)s [%(levelname)s] %(message)s", handlers = [ logging.FileHandler("monitor.log", encoding="utf-8"), logging.StreamHandler() ]
)
logger = logging.getLogger(__name__) # Cooldown tracking — prevent duplicate alerts
_last_alert: dict = {}
COOLDOWN_SECONDS = { "HIGH": 1800, # 30 min cooldown for high alerts "MEDIUM": 3600, # 1 hour cooldown for medium alerts "LOW": 21600, # 6 hour cooldown for low alerts
} def _should_alert(symbol: str, level: str) -> bool: key = f"{symbol}_{level}" now = datetime.utcnow() if key not in _last_alert: return True elapsed = (now - _last_alert[key]).total_seconds() return elapsed >= COOLDOWN_SECONDS.get(level, 3600) def run_cycle(symbol: str) -> None: """Execute one full monitoring cycle for a symbol.""" try: # 1. Fetch data snapshot = build_market_snapshot(symbol) # 2. Analyze signals = analyze_snapshot(snapshot) # 3. Log logger.info( f"{symbol} | Level: {signals.max_level} | " f"Signals: {len(signals.signals)} | " f"FR: {snapshot['funding_rate'].get('avg_funding_rate_pct', 'N/A'):.4f}% | " f"OI chg: {snapshot['open_interest'].get('change_pct', 'N/A'):+.2f}%" ) # 4. Alert if warranted if signals.has_alerts and _should_alert(symbol, signals.max_level): message = format_alert_message(snapshot, signals) if send_telegram(message): _last_alert[f"{symbol}_{signals.max_level}"] = datetime.utcnow() except Exception as e: logger.error(f"Cycle failed for {symbol}: {e}", exc_info=True) send_telegram(f"⚠️ Monitor error for {symbol}: {str(e)[:100]}") def main(): logger.info("=" * 60) logger.info(" CoinGlass Derivatives Monitor — Starting Up") logger.info(f" Watching: {', '.join(SYMBOLS)}") logger.info(f" Poll interval: every {POLL_INTERVAL} minutes") logger.info("=" * 60) # Run immediately on startup for symbol in SYMBOLS: run_cycle(symbol) # Schedule recurring runs for symbol in SYMBOLS: schedule.every(POLL_INTERVAL).minutes.do(run_cycle, symbol=symbol) while True: schedule.run_pending() time.sleep(30) if __name__ == "__main__": main()
python monitor.py
python monitor.py
python monitor.py
2026-05-11 08:00:01 [INFO] Fetching snapshot for BTC...
2026-05-11 08:00:03 [INFO] BTC | Level: MEDIUM | Signals: 2 | FR: 0.0312% | OI chg: +1.82%
2026-05-11 08:00:03 [INFO] Telegram alert sent successfully.
2026-05-11 08:00:03 [INFO] Fetching snapshot for ETH...
2026-05-11 08:00:05 [INFO] ETH | Level: NONE | Signals: 0 | FR: 0.0089% | OI chg: -0.41%
2026-05-11 08:00:01 [INFO] Fetching snapshot for BTC...
2026-05-11 08:00:03 [INFO] BTC | Level: MEDIUM | Signals: 2 | FR: 0.0312% | OI chg: +1.82%
2026-05-11 08:00:03 [INFO] Telegram alert sent successfully.
2026-05-11 08:00:03 [INFO] Fetching snapshot for ETH...
2026-05-11 08:00:05 [INFO] ETH | Level: NONE | Signals: 0 | FR: 0.0089% | OI chg: -0.41%
2026-05-11 08:00:01 [INFO] Fetching snapshot for BTC...
2026-05-11 08:00:03 [INFO] BTC | Level: MEDIUM | Signals: 2 | FR: 0.0312% | OI chg: +1.82%
2026-05-11 08:00:03 [INFO] Telegram alert sent successfully.
2026-05-11 08:00:03 [INFO] Fetching snapshot for ETH...
2026-05-11 08:00:05 [INFO] ETH | Level: NONE | Signals: 0 | FR: 0.0089% | OI chg: -0.41%
`
# On your server
git clone your-repo
cd crypto-derivatives-monitor
pip install -r requirements.txt # Set up your .env file
nano .env # Run with screen (persists after SSH disconnect)
screen -S monitor
python monitor.py
# Press Ctrl+A then D to detach # Or set up as a systemd service for auto-restart:
sudo nano /etc/systemd/system/crypto-monitor.service
`
# On your server
git clone your-repo
cd crypto-derivatives-monitor
pip install -r requirements.txt # Set up your .env file
nano .env # Run with screen (persists after SSH disconnect)
screen -S monitor
python monitor.py
# Press Ctrl+A then D to detach # Or set up as a systemd service for auto-restart:
sudo nano /etc/systemd/system/crypto-monitor.service
`
# On your server
git clone your-repo
cd crypto-derivatives-monitor
pip install -r requirements.txt # Set up your .env file
nano .env # Run with screen (persists after SSH disconnect)
screen -S monitor
python monitor.py
# Press Ctrl+A then D to detach # Or set up as a systemd service for auto-restart:
sudo nano /etc/systemd/system/crypto-monitor.service
[Unit]
Description=CoinGlass Crypto Derivatives Monitor
After=network.target [Service]
Type=simple
User=ubuntu
WorkingDirectory=/home/ubuntu/crypto-derivatives-monitor
ExecStart=/home/ubuntu/crypto-derivatives-monitor/venv/bin/python monitor.py
Restart=on-failure
RestartSec=30 [Install]
WantedBy=multi-user.target
[Unit]
Description=CoinGlass Crypto Derivatives Monitor
After=network.target [Service]
Type=simple
User=ubuntu
WorkingDirectory=/home/ubuntu/crypto-derivatives-monitor
ExecStart=/home/ubuntu/crypto-derivatives-monitor/venv/bin/python monitor.py
Restart=on-failure
RestartSec=30 [Install]
WantedBy=multi-user.target
[Unit]
Description=CoinGlass Crypto Derivatives Monitor
After=network.target [Service]
Type=simple
User=ubuntu
WorkingDirectory=/home/ubuntu/crypto-derivatives-monitor
ExecStart=/home/ubuntu/crypto-derivatives-monitor/venv/bin/python monitor.py
Restart=on-failure
RestartSec=30 [Install]
WantedBy=multi-user.target
sudo systemctl enable crypto-monitor
sudo systemctl start crypto-monitor
sudo systemctl status crypto-monitor # Follow logs in real time
sudo journalctl -u crypto-monitor -f
sudo systemctl enable crypto-monitor
sudo systemctl start crypto-monitor
sudo systemctl status crypto-monitor # Follow logs in real time
sudo journalctl -u crypto-monitor -f
sudo systemctl enable crypto-monitor
sudo systemctl start crypto-monitor
sudo systemctl status crypto-monitor # Follow logs in real time
sudo journalctl -u crypto-monitor -f
┌──────────────────────────────────────────────────────┐
│ Monthly Operating Cost │
├─────────────────────────┬───────────────────────────┤
│ CoinGlass API │ $29/month (Hobbyist) │
│ Claude/GPT (optional) │ $5–15/month │
│ VPS (t3.micro / equiv) │ $5–10/month │
│ Telegram Bot │ Free │
├─────────────────────────┼───────────────────────────┤
│ Total │ ~$40–55/month │
└─────────────────────────┴───────────────────────────┘
┌──────────────────────────────────────────────────────┐
│ Monthly Operating Cost │
├─────────────────────────┬───────────────────────────┤
│ CoinGlass API │ $29/month (Hobbyist) │
│ Claude/GPT (optional) │ $5–15/month │
│ VPS (t3.micro / equiv) │ $5–10/month │
│ Telegram Bot │ Free │
├─────────────────────────┼───────────────────────────┤
│ Total │ ~$40–55/month │
└─────────────────────────┴───────────────────────────┘
┌──────────────────────────────────────────────────────┐
│ Monthly Operating Cost │
├─────────────────────────┬───────────────────────────┤
│ CoinGlass API │ $29/month (Hobbyist) │
│ Claude/GPT (optional) │ $5–15/month │
│ VPS (t3.micro / equiv) │ $5–10/month │
│ Telegram Bot │ Free │
├─────────────────────────┼───────────────────────────┤
│ Total │ ~$40–55/month │
└─────────────────────────┴───────────────────────────┘
Signal Combination Reference
════════════════════════════════════════════════════════════
Funding Rate OI Change L/S Ratio Interpretation
──────────── ───────── ───────── ──────────────
> +0.10% Surging Longs > 65% 🔴 Classic top setup. Avoid new longs. < -0.05% Surging Shorts > 60% 🟢 Short squeeze forming. Watch for bullish catalyst. Any extreme Falling Normalizing 🟡 Deleveraging in progress. Wait for dust to settle. Top traders OI stable Retail long 🔴 Distribution signal.
short vs > 60% Smart money fading retail.
retail long Top traders OI stable Retail 🟢 Accumulation signal.
long vs short >55% Smart money buying fear.
retail short
════════════════════════════════════════════════════════════
Signal Combination Reference
════════════════════════════════════════════════════════════
Funding Rate OI Change L/S Ratio Interpretation
──────────── ───────── ───────── ──────────────
> +0.10% Surging Longs > 65% 🔴 Classic top setup. Avoid new longs. < -0.05% Surging Shorts > 60% 🟢 Short squeeze forming. Watch for bullish catalyst. Any extreme Falling Normalizing 🟡 Deleveraging in progress. Wait for dust to settle. Top traders OI stable Retail long 🔴 Distribution signal.
short vs > 60% Smart money fading retail.
retail long Top traders OI stable Retail 🟢 Accumulation signal.
long vs short >55% Smart money buying fear.
retail short
════════════════════════════════════════════════════════════
Signal Combination Reference
════════════════════════════════════════════════════════════
Funding Rate OI Change L/S Ratio Interpretation
──────────── ───────── ───────── ──────────────
> +0.10% Surging Longs > 65% 🔴 Classic top setup. Avoid new longs. < -0.05% Surging Shorts > 60% 🟢 Short squeeze forming. Watch for bullish catalyst. Any extreme Falling Normalizing 🟡 Deleveraging in progress. Wait for dust to settle. Top traders OI stable Retail long 🔴 Distribution signal.
short vs > 60% Smart money fading retail.
retail long Top traders OI stable Retail 🟢 Accumulation signal.
long vs short >55% Smart money buying fear.
retail short
════════════════════════════════════════════════════════════ - Fetches real-time funding rates, open interest, liquidation data, and long/short ratios
- Detects anomalous market conditions automatically
- Sends Telegram alerts when risk signals fire
- Runs 24/7 on a $5/month cloud server - Why Derivatives Data Matters
- Understanding the Four Key Metrics
- Setting Up CoinGlass API V4
- Building the Data Layer
- Building the Signal Engine
- Building the Alert Layer
- Putting It All Together
- Deploying to Production
- What to Watch For - Positive funding rate: Longs pay shorts. The market is leaning bullish, and longs are paying a premium to maintain their positions.
- Negative funding rate: Shorts pay longs. The market is leaning bearish. - OI surging + price rising: New money is entering long positions. Bullish conviction is building.
- OI surging + price falling: New money is entering short positions. Bearish conviction is building.
- OI falling + price rising: Shorts are covering (short squeeze). Rally may lack sustainability.
- OI falling + price falling: Longs are capitulating. Potential exhaustion of selling pressure. - Global L/S ratio: All accounts — dominated by retail traders
- Top trader L/S ratio: Top 5–20% of accounts by position size — closer to "smart money" - Go to coinglass.com/pricing
- Create an account and choose a plan (free tier available for testing)
- Navigate to your dashboard and generate an API key