Tools: FMZ Platform Gold Rush: Practical Analysis of a Highly Flexible Python Trend Trading Framework

Tools: FMZ Platform Gold Rush: Practical Analysis of a Highly Flexible Python Trend Trading Framework

Source: Dev.to

Framework Structure and Functions ## Initialization and Setup ## Data Management and Storage ## Strategy Interaction and Command Processing ## Trading and Order Management ## Risk Management and Profit Statistics ## Trend Judgment and Trading Logic ## Strategy Core Logic ## Status Monitoring and Log Output ## Main Function and Strategy Execution ## Framework Features ## Applicability ## Summary FMZ Platform Gold Rush: Practical Analysis of a Highly Flexible Python Trend Trading Framework I often browse the FMZ platform and always find treasures. Today I discovered a trend strategy from 2021, and I'm amazed by the author's sophistication and comprehensive code architecture with high flexibility. The original strategy was in JavaScript, but I've rewritten it in Python for the convenience of Python developers. To be honest, when I first got into quantitative trading, many newcomers take many detours. We often encounter various problems such as order failures, poor risk control leading to losses, and data loss after strategy restarts. Later, I gradually realized that having a good framework is really important - it can help us avoid many pitfalls. This trend strategy framework is exactly such a good thing. It's not just a simple trading strategy, but more like a toolbox that handles basic but important functions like order placement, stop-loss, and data management for you. You only need to focus on core questions like "when to buy" and "when to sell". Moreover, this framework is designed to be very open - you can easily replace EMA with MACD, RSI, or other indicators you prefer. Want to do trend following? No problem. Want to try mean reversion? That works too. Even want to combine multiple indicators? Totally possible. This flexibility makes it very practical - one codebase that can be modified to experiment with different ideas. Today I'm sharing this framework, hoping it can help friends who are exploring the quantitative trading path. Let me introduce the various parts of this framework in detail below. I believe you'll find it very useful after reading. Unlike the multiple independent functions used in multi-asset trading frameworks, this framework attempts to use Class format to organize and manage various parts of the strategy. This object-oriented design approach not only improves code maintainability and extensibility but also makes strategy components more modular, facilitating subsequent adjustments and optimizations. The framework mainly consists of the following modules, with each module's functions having specific purposes to ensure strategy flexibility and practicality. init Function Function: The init function is the initialization method of the strategy class, responsible for setting basic strategy configurations, initializing variables, and obtaining market information. This function ensures the strategy is properly configured before running, guaranteeing smooth execution of subsequent trading operations. initDatas Function Function: Initialize data during strategy runtime, including account assets, profit statistics, etc. saveStrategyRunTime Function Function: Save the strategy's start runtime for subsequent statistics and monitoring. setStrategyRunTime Function Function: Set the strategy's start runtime and save to local storage. getDaysFromTimeStamp Function Function: Calculate the difference in days between two timestamps, used for counting strategy runtime duration. saveUserDatasLocal Function Function: Save key data during strategy runtime locally for recovery when strategy restarts. readUserDataLocal Function Function: Read locally saved user data for data recovery when strategy restarts. clearUserDataLocal Function Function: Clear locally saved user data, typically used for strategy reset or debugging. runCmd Function Function: Process commands sent by users through the interactive interface, such as clearing local data, modifying order quantities, etc. orderDirectly Function Function: Place orders directly based on direction and price, supporting opening and closing operations. openLong Function Function: Open long position, place orders based on price and quantity. openShort Function Function: Open short position, place orders based on price and quantity. coverLong Function Function: Close long position, place orders based on price and quantity. coverShort Function Function: Close short position, place orders based on price and quantity. getRealOrderSize Function Function: Recalculate actual order quantity based on price and quantity, supporting orders based on margin ratio. getSinglePositionMargin Function Function: Calculate margin occupied by a single position. getSinglePositionProfit Function Function: Calculate profit and profit rate of a single position. calculateForcedPrice Function Function: Calculate the liquidation price of positions. getMaxOrderSize Function Function: Calculate maximum order quantity. getAccountAsset Function Function: Calculate total account assets, including positions and available balance. calculateProfit Function Function: Calculate and record strategy profit situation. isEnoughAssetToOrder Function Function: Check if account funds are sufficient for placing orders. runInKLinePeriod Function Function: Determine whether to execute strategy logic based on K-line period. trendJudgment Function (Core Trend Judgment Module) Function: Judge current trend based on technical indicators. This is the most flexible module in the entire framework, where users can replace different technical indicators according to their needs for trend judgment. Current Implementation: Uses EMA (Exponential Moving Average) combined with standard deviation to judge trends. **Extensibility: **This function is designed as a pluggable module, users can easily replace it with other technical indicators, such as: precise trend judgment Steps: stopLoss Function Function: Execute stop-loss operations based on stop-loss rules. takeProfit Function Function: Execute take-profit operations based on take-profit rules. trackingTakeProfit Function Function: Execute take-profit operations based on trailing take-profit rules. order Function Function: Execute order placement based on trend judgment results. trendStrategy Function Function: The core logic function of the strategy, responsible for executing trend judgment, stop-loss and take-profit, trailing take-profit, and order placement operations. printLogStatus Function Function:Print strategy running status, account information, and position situation. main Function Function: The main function of the strategy, responsible for initializing the strategy and cyclically executing strategy logic. Flexible Trend Judgment: Through EMA indicators and standard deviation, the strategy can flexibly judge market trends, suitable for different market environments. This function is just an example; users can use different technical indicators (such as RSI, MACD, Bollinger Bands, etc.) for trend judgment according to their needs. Diverse Stop-Loss and Take-Profit Mechanisms: Supports fixed percentage stop-loss and take-profit as well as trailing take-profit, meeting the needs of traders with different risk preferences. Local Data Management: Strategy runtime data and user data are saved locally, ensuring the strategy can recover to its previous state after restart. Interactive Commands: Supports interaction with the strategy through command line, making it convenient for users to adjust strategy parameters or execute specific operations. This framework is not only suitable for cryptocurrency markets but can also be extended by adding different technical indicators (such as RSI, MACD, etc.) in the trendJudgment function to adapt to different trading strategy needs. Additionally, this framework can be specifically modified for spot markets or multi-asset contracts, with high flexibility and extensibility. Spot Market Support: Currently, this framework mainly targets contract markets; in the future, it can be extended to support spot market trading strategies. Multi-Asset Contracts: By adding multi-asset contract support, the strategy can simultaneously monitor and trade multiple cryptocurrencies, improving capital utilization. Machine Learning Integration: Combined with machine learning algorithms, further improve the accuracy of trend judgment and the intelligence level of strategies. Risk Management Optimization: Further optimize risk management mechanisms, such as dynamically adjusting leverage multiples, multi-level stop-loss and take-profit, etc., to improve strategy robustness. As a comprehensive and highly flexible automated trading system, this framework is suitable for trend trading in cryptocurrency markets. Through continuous optimization and expansion, it has the potential to become a powerful tool for cryptocurrency traders in the future, helping users better develop their own quantitative strategies. The "Cryptocurrency Trend Strategy Trading Framework" has a complete structure, and although the code volume is considerable, from the perspective of live trading, it basically covers the core functional modules needed in trend trading. Therefore, whether from the perspective of learning trading strategies or practical application, this framework has important reference value and practical significance. Its comprehensive functionality and flexibility enable it to adapt to different market environments, providing powerful support for everyone. The FMZ platform serves as a treasure trove of quantitative trading, containing a wealth of knowledge and strategies for cryptocurrency quantitative trading. Each strategy embodies the wisdom and experience of developers. Welcome everyone to come here to prospect for gold and explore more valuable trading strategies and technical sharing. Thanks to all users with innovative spirit and willingness to share - it is because of everyone's contributions that this platform has become an important place for quantitative trading learning and communication, jointly helping everyone improve their quantitative trading skills and expertise. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse COMMAND_BLOCK: '''backtest start: 2024-11-26 00:00:00 end: 2024-12-03 00:00:00 period: 1d basePeriod: 1d exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT"}] ''' import json, talib import numpy as np class TrendStrategy: def __init__(self): # Basic settings self._Currency = TradeCurrency self._Interval = Interval self._UseQuarter = UseQuarter self._UseContract = TradeCurrency + ('.swap' if self._UseQuarter else '.quarter') self._OnlyTrendJudgment = OnlyTrendJudgment self._EnableMessageSend = EnableMessageSend # Trend judgment self._RunInKLinePeriod = RunInKLinePeriod self._KLinePeriod = KLinePeriod self._EmaLength = EmaLength self._EmaCoefficient = EmaCoefficient self._UseStddev = UseStddev self._UseRecordsMiddleValue = UseRecordsMiddleValue self._StddevLength = StddevLength self._StddevDeviations = StddevDeviations # Order settings self._MarginLevel = MarginLevel self._OrderSize = OrderSize self._OrderByMargin = OrderByMargin self._OrderMarginPercent = OrderMarginPercent self._PricePrecision = None self._AmountPrecision = None self._OneSizeInCurrentCoin = None self._QuarterOneSizeValue = None # Take profit and stop loss self._UseStopLoss = UseStopLoss self._StopLossPercent = StopLossPercent self._UseTakeProfit = UseTakeProfit self._TakeProfitPercent = TakeProfitPercent self._UseTrackingTakeProfit = UseTrackingTakeProfit self._UsePositionRetracement = UsePositionRetracement self._TakeProfitTriggerPercent = TakeProfitTriggerPercent self._CallBakcPercent = CallBakcPercent # Strategy variables self._LastBarTime = 0 self._TrendWhenTakeProfitOrStopLoss = 0 self._HadStopLoss = False self._TriggeredTakeProfit = False self._PeakPriceInPosition = 0 self._HadTakeProfit = False self._PriceCrossEMAStatus = 0 # Statistical variables self._InitAsset = 0 self._ProfitLocal = 0 self._TakeProfitCount = 0 self._TradeCount = 0 self.StrategyRunTimeStampString = "strategy_run_time" self._StrategyDatas = {"start_run_timestamp": 0, "others": ""} self._UserDatas = None # Relatively fixed parameters self._MaintenanceMarginRate = 0.004 self._TakerFee = 0.0005 self._IsUsdtStandard = False # Get contract information ticker = _C(exchange.GetTicker, self._UseContract) marketInfo = exchange.GetMarkets()[self._UseContract] Log('Get market information:', marketInfo) self._PricePrecision = marketInfo['PricePrecision'] self._AmountPrecision = marketInfo['AmountPrecision'] self._OneSizeInCurrentCoin = marketInfo['CtVal'] self._QuarterOneSizeValue = marketInfo['CtVal'] exchange.SetCurrency(self._Currency) exchange.SetMarginLevel(self._UseContract, self._MarginLevel) exchange.SetPrecision(self._PricePrecision, self._AmountPrecision) # Initialize data def initDatas(self): self.saveStrategyRunTime() self.readUserDataLocal() self._InitAsset = self._UserDatas["init_assets"] self._ProfitLocal = self._UserDatas["profit_local"] self._TakeProfitCount = self._UserDatas["take_profit_count"] self._TradeCount = self._UserDatas["trade_count"] if self._OrderByMargin: self.getRealOrderSize(-1, self._OrderSize) Log("Order quantity has been recalculated:", self._OrderSize) if self._UseTakeProfit and self._UseTrackingTakeProfit: raise Exception("Take profit and trailing take profit cannot be used simultaneously!") # Set contract def setContract(self): self._IsUsdtStandard = "USDT" in self._Currency exchange.SetCurrency(self._Currency) if self._UseQuarter: exchange.SetContractType("quarter") else: exchange.SetContractType("swap") # Save program start run time (second-level timestamp) def saveStrategyRunTime(self): local_data_strategy_run_time = _G(self.StrategyRunTimeStampString) if local_data_strategy_run_time is None: self._StrategyDatas["start_run_timestamp"] = Unix() _G(self.StrategyRunTimeStampString, self._StrategyDatas["start_run_timestamp"]) else: self._StrategyDatas["start_run_timestamp"] = local_data_strategy_run_time # Set program start run time (second-level timestamp) def setStrategyRunTime(self, timestamp): _G(self.StrategyRunTimeStampString, timestamp) self._StrategyDatas["start_run_timestamp"] = timestamp # Calculate days between two timestamps (parameters are second-level timestamps) def getDaysFromTimeStamp(self, start_time, end_time): if end_time < start_time: return 0 return (end_time - start_time) // (60 * 60 * 24) # Save data locally def saveUserDatasLocal(self): self._UserDatas = { "init_assets": self._InitAsset, "profit_local": self._ProfitLocal, "take_profit_count": self._TakeProfitCount, "trade_count": self._TradeCount } # Store locally _G(exchange.GetLabel(), self._UserDatas) Log("All data has been saved locally.") # Read user local data, run once when program starts def readUserDataLocal(self): user_data = _G(exchange.GetLabel()) if user_data is None: self._InitAsset = self.getAccountAsset(_C(exchange.GetPosition), _C(exchange.GetAccount), _C(exchange.GetTicker)) self._UserDatas = { "init_assets": self._InitAsset, "profit_local": 0, "take_profit_count": 0, "trade_count": 0 } else: self._UserDatas = user_data # Clear user local data, run when interactive button is clicked def clearUserDataLocal(self): _G(exchange.GetLabel(), None) Log(exchange.GetLabel(), ": Local data has been cleared.") # Strategy interaction def runCmd(self): cmd = GetCommand() if cmd: # Detect interactive commands Log("Received command:", cmd, "#FF1CAE") if cmd.startswith("ClearLocalData:"): # Clear local data self.clearUserDataLocal() elif cmd.startswith("SaveLocalData:"): # Save data locally self.saveUserDatasLocal() elif cmd.startswith("ClearLog:"): # Clear logs log_reserve = cmd.replace("ClearLog:", "") LogReset(int(log_reserve)) elif cmd.startswith("OrderSize:"): # Modify order quantity if self._OrderByMargin: Log("Order by margin is already enabled, cannot modify order quantity directly!") else: order_size = int(cmd.replace("OrderSize:", "")) self._OrderSize = order_size Log("Order quantity has been modified to:", self._OrderSize) elif cmd.startswith("OrderMarginPercent:"): # Modify order margin percentage if self._OrderByMargin: order_margin_percent = float(cmd.replace("OrderMarginPercent:", "")) self._OrderMarginPercent = order_margin_percent Log("Order margin percentage:", self._OrderMarginPercent, "%") else: Log("Order by margin is not enabled, cannot modify order margin percentage!") # Trading functions def orderDirectly(self, distance, price, amount): tradeFunc = None if amount <= 0: raise Exception("Parameter error, order quantity is already less than 0!") if distance == "buy": tradeFunc = exchange.Buy elif distance == "sell": tradeFunc = exchange.Sell elif distance == "closebuy": tradeFunc = exchange.Sell else: tradeFunc = exchange.Buy exchange.SetDirection(distance) return tradeFunc(price, amount) def openLong(self, price, amount): real_amount = self.getRealOrderSize(price, amount) return self.orderDirectly("buy", price, real_amount) def openShort(self, price, amount): real_amount = self.getRealOrderSize(price, amount) return self.orderDirectly("sell", price, real_amount) def coverLong(self, price, amount): return self.orderDirectly("closebuy", price, amount) def coverShort(self, price, amount): return self.orderDirectly("closesell", price, amount) # Recalculate order quantity def getRealOrderSize(self, price, amount): real_price = price if price != -1 else _C(exchange.GetTicker).Last if self._OrderByMargin: if self._IsUsdtStandard: self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) / real_price * self._MarginLevel / self._OneSizeInCurrentCoin, self._AmountPrecision) # USDT-margined quantity (leveraged quantity) else: self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) * self._MarginLevel * real_price / self._QuarterOneSizeValue, self._AmountPrecision) # Coin-margined quantity (leveraged quantity) else: self._OrderSize = amount return self._OrderSize # Get margin occupied by single position def getSinglePositionMargin(self, position, ticker): position_margin = 0 if len(position) > 0: if self._IsUsdtStandard: position_margin = position[0].Amount * self._OneSizeInCurrentCoin * ticker.Last / self._MarginLevel else: position_margin = position[0].Amount * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel return position_margin # Get profit and profit percentage of unidirectional position def getSinglePositionProfit(self, position, ticker): if len(position) == 0: return [0, 0] price = ticker.Last position_margin = self.getSinglePositionMargin(position, ticker) position_profit_percent = (price - position[0].Price) / position[0].Price * self._MarginLevel if position[0].Type == PD_LONG else (position[0].Price - price) / position[0].Price * self._MarginLevel position_profit = position_margin * position_profit_percent return [position_profit, position_profit_percent] # Calculate liquidation price def calculateForcedPrice(self, account, position, ticker): position_profit = 0 total_avail_balance = 0 forced_price = 0 position_margin = self.getSinglePositionMargin(position, ticker) [position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker) if self._IsUsdtStandard: total_avail_balance = account.Balance + position_margin + account.FrozenBalance - position_profit if position_profit > 0 else account.Balance + position_margin + account.FrozenBalance if position[0].Type == PD_LONG: forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin + (position[0].Amount * position[0].Price) / (position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount) else: forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin - (position[0].Amount * position[0].Price) / (-1 * position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount) else: total_avail_balance = account.Stocks + position_margin + account.FrozenStocks - position_profit if position_profit > 0 else account.Stocks + position_margin + account.FrozenStocks if position[0].Type == PD_LONG: forced_price = (self._MaintenanceMarginRate * position[0].Amount + position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue + position[0].Amount / position[0].Price) else: forced_price = (self._MaintenanceMarginRate * position[0].Amount - position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue - position[0].Amount / position[0].Price) if forced_price < 0: forced_price = 0 return forced_price # Calculate maximum order quantity def getMaxOrderSize(self, margin_level, ticker, account): max_order_size = 0 if self._IsUsdtStandard: max_order_size = account.Balance * margin_level / (self._OneSizeInCurrentCoin * ticker.Last) else: max_order_size = account.Stocks * ticker.Last / self._QuarterOneSizeValue * margin_level return _N(max_order_size, self._AmountPrecision) # Get account assets def getAccountAsset(self, position, account, ticker): # Calculate account initial assets under different situations account_asset = 0 position_margin = self.getSinglePositionMargin(position, ticker) if self._IsUsdtStandard: if len(position) > 0: account_asset = account.Balance + account.FrozenBalance + position_margin else: account_asset = account.Balance + account.FrozenBalance else: if len(position) > 0: account_asset = account.Stocks + account.FrozenStocks + position_margin else: account_asset = account.Stocks + account.FrozenStocks return account_asset # Profit statistics def calculateProfit(self, ticker): # Re-obtain account position and assets position = _C(exchange.GetPosition) account = _C(exchange.GetAccount) # Current total profit - previous total profit = current profit current_profit = (self.getAccountAsset(position, account, ticker) - self._InitAsset) - self._ProfitLocal self._ProfitLocal += current_profit if current_profit > 0: self._TakeProfitCount += 1 self._TradeCount += 1 LogProfit(_N(self._ProfitLocal, 4), " Current profit:", _N(current_profit, 6)) self.saveUserDatasLocal() # Check if there are enough funds to place order def isEnoughAssetToOrder(self, order_size, ticker): is_enough = True account = _C(exchange.GetAccount) if self._IsUsdtStandard: if account.Balance < order_size * ticker.Last * self._OneSizeInCurrentCoin / self._MarginLevel: is_enough = False else: if account.Stocks < order_size * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel: is_enough = False return is_enough # Run strategy core according to K-line period def runInKLinePeriod(self, records): bar_time = records[-1].Time if self._RunInKLinePeriod and self._LastBarTime == bar_time: return False self._LastBarTime = bar_time return True # Trend judgment module (editable specific indicators) def trendJudgment(self, records): # Check if price crosses EMA def checkPriceCrossEma(price, ema_value): if self._PriceCrossEMAStatus == 0: if price <= ema_value: self._PriceCrossEMAStatus = -1 else: self._PriceCrossEMAStatus = 1 elif (self._PriceCrossEMAStatus == -1 and price >= ema_value) or (self._PriceCrossEMAStatus == 1 and price <= ema_value): self._PriceCrossEMAStatus = 2 # Completed crossing # EMA long/short judgment ema_long = False ema_short = False price = records[-2].Close # Close price of already closed K-line ema = TA.EMA(records, self._EmaLength) ema_value = ema[-2] # EMA value corresponding to closed K-line ema_upper = ema_value * (1 + self._EmaCoefficient) ema_lower = ema_value * (1 - self._EmaCoefficient) checkPriceCrossEma(price, ema_value) if price > ema_upper: ema_long = True elif price < ema_lower: ema_short = True # Standard deviation judgment in_trend = False if self._UseStddev: records_data = [] for i in range(len(records)): records_data.append((records[i].High + records[i].Low) / 2 if self._UseRecordsMiddleValue else records[i].Close) records_data = np.array(records_data) # Convert list to np.array stddev = np.std(records_data, ddof=1) # Use numpy to calculate standard deviation if stddev > self._StddevDeviations: in_trend = True else: in_trend = True # Trend judgment long = in_trend and ema_long short = in_trend and ema_short if long: Log("Current trend is: Long", self._EnableMessageSend and "@" or "#00FF7F") elif short: Log("Current trend is: Short", self._EnableMessageSend and "@" or "#FF0000") else: Log("Current trend is: Sideways", self._EnableMessageSend and "@" or "#007FFF") return [long, short] # Stop loss def stopLoss(self, position, ticker): stop_loss_price = 0 price = ticker.Last if len(position) == 1 and self._UseStopLoss: if position[0].Type == PD_LONG: stop_loss_price = position[0].Price * (1 - self._StopLossPercent / 100) if price < stop_loss_price: self.coverLong(-1, position[0].Amount) self.calculateProfit(ticker) self._TrendWhenTakeProfitOrStopLoss = 1 self._HadStopLoss = True Log("Long position stop loss. Stop loss price:", _N(stop_loss_price, 6), ", Position price:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE") elif position[0].Type == PD_SHORT: stop_loss_price = position[0].Price * (1 + self._StopLossPercent / 100) if price > stop_loss_price: self.coverShort(-1, position[0].Amount) self.calculateProfit(ticker) self._TrendWhenTakeProfitOrStopLoss = -1 self._HadStopLoss = True Log("Short position stop loss. Stop loss price:", _N(stop_loss_price, 6), ", Position price:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE") # Take profit def takeProfit(self, position, ticker): take_profit_price = 0 price = ticker.Last if len(position) == 1 and self._UseTakeProfit: if position[0].Type == PD_LONG: take_profit_price = position[0].Price * (1 + self._TakeProfitPercent / 100) if price > take_profit_price: self.coverLong(-1, position[0].Amount) self.calculateProfit(ticker) self._TrendWhenTakeProfitOrStopLoss = 1 self._HadTakeProfit = True Log("Long position take profit. Take profit price:", _N(take_profit_price, 6), ", Position price:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE") elif position[0].Type == PD_SHORT: take_profit_price = position[0].Price * (1 - self._TakeProfitPercent / 100) if price < take_profit_price: self.coverShort(-1, position[0].Amount) self.calculateProfit(ticker) self._TrendWhenTakeProfitOrStopLoss = -1 self._HadTakeProfit = True Log("Short position take profit. Take profit price:", _N(take_profit_price, 6), ", Position price:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE") # Trailing take profit def trackingTakeProfit(self, position, ticker): take_profit_price = 0 trigger_price = 0 price = ticker.Last if len(position) > 0 and self._UseTrackingTakeProfit: if position[0].Type == PD_LONG: # Long position holding if self._TriggeredTakeProfit: # Trigger price reached, monitor for take profit self._PeakPriceInPosition = price if price > self._PeakPriceInPosition else self._PeakPriceInPosition # Update price peak if self._UsePositionRetracement: take_profit_price = self._PeakPriceInPosition - (self._PeakPriceInPosition - position[0].Price) * (self._CallBakcPercent / 100) # Calculate retracement take profit price else: take_profit_price = self._PeakPriceInPosition * (1 - self._CallBakcPercent / 100) # Calculate retracement take profit price if price < take_profit_price: self.coverLong(-1, position[0].Amount) # Close long self.calculateProfit(ticker) self._TriggeredTakeProfit = False # Reset trigger flag self._TrendWhenTakeProfitOrStopLoss = 1 # Record trend when taking profit self._HadTakeProfit = True # Record that take profit occurred Log("Long position trailing take profit: Position price peak:", _N(self._PeakPriceInPosition, 6), ", Take profit price:", _N(take_profit_price, 6), ", Current price:", _N(price, 6), ", Position price:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE") else: # Monitor if trailing take profit trigger price is reached trigger_price = position[0].Price * (1 + self._TakeProfitTriggerPercent / 100) if price > trigger_price: self._TriggeredTakeProfit = True # Trigger trailing take profit self._PeakPriceInPosition = price # Record price peak Log("Long position reached trailing take profit trigger price:", _N(trigger_price, 6), ", Current price:", _N(price, 6), ", Position price:", _N(position[0].Price, 6)) elif position[0].Type == PD_SHORT: # Short position holding if self._TriggeredTakeProfit: # Trigger price reached, monitor for take profit self._PeakPriceInPosition = price if price < self._PeakPriceInPosition else self._PeakPriceInPosition # Update price low if self._UsePositionRetracement: take_profit_price = self._PeakPriceInPosition + (position[0].Price - self._PeakPriceInPosition) * (self._CallBakcPercent / 100) # Calculate retracement take profit price else: take_profit_price = self._PeakPriceInPosition * (1 + self._CallBakcPercent / 100) # Calculate retracement take profit price if price > take_profit_price: self.coverShort(-1, position[0].Amount) # Close short self.calculateProfit(ticker) self._TriggeredTakeProfit = False # Reset trigger flag self._TrendWhenTakeProfitOrStopLoss = -1 # Record trend when taking profit self._HadTakeProfit = True # Record that take profit occurred Log("Short position trailing take profit: Position price low:", _N(self._PeakPriceInPosition, 6), ", Take profit price:", _N(take_profit_price, 6), ", Current price:", _N(price, 6), ", Position price:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE") else: # Monitor if trailing take profit trigger price is reached trigger_price = position[0].Price * (1 - self._TakeProfitTriggerPercent / 100) if price < trigger_price: self._TriggeredTakeProfit = True # Trigger trailing take profit self._PeakPriceInPosition = price # Record price low Log("Short position reached trailing take profit trigger price:", _N(trigger_price, 6), ", Current price:", _N(price, 6), ", Position price:", _N(position[0].Price, 6)) # Place order def order(self, long, short, position, ticker): position_size = position[0].Amount if len(position) > 0 else 0 position_type = position[0].Type if len(position) > 0 else None if long: # Long trend if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == 1: # Stop loss or take profit occurred, and trend was long when it happened, do not go long again return if position_size > 0 and position_type == PD_SHORT: self.coverShort(-1, position_size) self.calculateProfit(ticker) elif position_size > 0 and position_type == PD_LONG: # Long position holding, do not place duplicate orders return else: # No position, if first run or strategy restart, need to wait for price to cross EMA once before placing order if self._PriceCrossEMAStatus != 2: return if self.isEnoughAssetToOrder(self._OrderSize, ticker): self.openLong(-1, self._OrderSize) self._HadStopLoss = False self._HadTakeProfit = False else: raise Exception("Insufficient order amount!") elif short: # Short trend if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == -1: # Stop loss or take profit occurred, and trend was short when it happened, do not go short again return if position_size > 0 and position_type == PD_LONG: self.coverLong(-1, position_size) self.calculateProfit(ticker) elif position_size > 0 and position_type == PD_SHORT: # Short position holding, do not place duplicate orders return else: # No position, if first run or strategy restart, need to wait for price to cross EMA once before placing order if self._PriceCrossEMAStatus != 2: return if self.isEnoughAssetToOrder(self._OrderSize, ticker): self.openShort(-1, self._OrderSize) self._HadStopLoss = False self._HadTakeProfit = False else: raise Exception("Insufficient order amount!") # Trend strategy def trendStrategy(self): ticker = _C(exchange.GetTicker) position = _C(exchange.GetPosition) account = _C(exchange.GetAccount) records = _C(exchange.GetRecords, self._KLinePeriod * 60) if len(position) > 1: Log(position) raise Exception("Simultaneous long and short positions!") # Strategy interaction self.runCmd() # Status bar information printing self.printLogStatus(ticker, account, position) # Stop loss self.stopLoss(position, ticker) # Take profit self.takeProfit(position, ticker) # Trailing take profit self.trackingTakeProfit(position, ticker) # Run strategy according to K-line period if not self.runInKLinePeriod(records): return # Trend judgment and order placement long = False short = False [long, short] = self.trendJudgment(records) if not self._OnlyTrendJudgment: self.order(long, short, position, ticker) # Status bar information printing def printLogStatus(self, ticker, account, position): table_overview = { "type": "table", "title": "Strategy Overview", "cols": ["Start Time", "Running Days", "Trade Count", "Win Rate", "Est. Monthly %", "Est. Annual %"], "rows": [] } table_account = { "type": "table", "title": "Account Funds", "cols": ["Current Asset", "Initial Asset", "Available Balance", "Frozen Balance", "Max Order Size", "Profit", "Profit %"], "rows": [] } table_position = { "type": "table", "title": "Position Info", "cols": ["Symbol", "Leverage", "Avg Entry Price", "Direction", "Size", "Margin", "Est. Liq. Price", "Unrealized PnL", "Unrealized PnL %"], "rows": [] } i = 0 # Strategy Overview the_running_days = self.getDaysFromTimeStamp(self._StrategyDatas["start_run_timestamp"], Unix()) monthly_rate_of_profit = 0 if the_running_days > 1: monthly_rate_of_profit = self._ProfitLocal / self._InitAsset / the_running_days * 30 table_overview["rows"].append([_D(self._StrategyDatas["start_run_timestamp"]), the_running_days, self._TradeCount, 0 if self._TradeCount == 0 else (str(_N(self._TakeProfitCount / self._TradeCount * 100, 2)) + "%"), str(_N(monthly_rate_of_profit * 100, 2)) + "%", str(_N(monthly_rate_of_profit * 12 * 100, 2)) + "%"]) # Account Funds current_asset = self.getAccountAsset(position, account, ticker) max_order_size = self.getMaxOrderSize(self._MarginLevel, ticker, account) asset_profit = current_asset - self._InitAsset asset_profit_percent = asset_profit / self._InitAsset table_account["rows"].append([_N(current_asset, 4), _N(self._InitAsset, 4), _N(account.Balance if self._IsUsdtStandard else account.Stocks, 4), _N(account.FrozenBalance if self._IsUsdtStandard else account.FrozenStocks, 4), max_order_size, _N(asset_profit, 4), str(_N(asset_profit_percent * 100, 2)) + "%"]) # Position Info position_direction = "" forced_cover_up_price = 0 position_profit_percent = 0 position_profit = 0 position_margin = 0 if len(position) == 0: table_position["rows"].append(["No Position", "-", "-", "-", "-", "-", "-", "-", "-"]) else: position_direction = "Long" if position[0].Type == PD_LONG else "Short" [position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker) position_margin = self.getSinglePositionMargin(position, ticker) forced_cover_up_price = self.calculateForcedPrice(account, position, ticker) table_position["rows"].append([exchange.GetCurrency(), self._MarginLevel, _N(position[0].Price, 4), position_direction, position[0].Amount, _N(position_margin, 4), _N(forced_cover_up_price, 4), _N(position_profit, 4), str(_N((position_profit_percent * 100), 2)) + "%"]) # Print tables LogStatus('`' + json.dumps(table_overview) + '`\n' + '`' + json.dumps(table_account) + '`\n' + '`' + json.dumps(table_position) + '`\n') # main def main(): exchange.IO('simulate', True) strategy = TrendStrategy() strategy.setContract() strategy.initDatas() while True: strategy.trendStrategy() Sleep(strategy._Interval) Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK: '''backtest start: 2024-11-26 00:00:00 end: 2024-12-03 00:00:00 period: 1d basePeriod: 1d exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT"}] ''' import json, talib import numpy as np class TrendStrategy: def __init__(self): # Basic settings self._Currency = TradeCurrency self._Interval = Interval self._UseQuarter = UseQuarter self._UseContract = TradeCurrency + ('.swap' if self._UseQuarter else '.quarter') self._OnlyTrendJudgment = OnlyTrendJudgment self._EnableMessageSend = EnableMessageSend # Trend judgment self._RunInKLinePeriod = RunInKLinePeriod self._KLinePeriod = KLinePeriod self._EmaLength = EmaLength self._EmaCoefficient = EmaCoefficient self._UseStddev = UseStddev self._UseRecordsMiddleValue = UseRecordsMiddleValue self._StddevLength = StddevLength self._StddevDeviations = StddevDeviations # Order settings self._MarginLevel = MarginLevel self._OrderSize = OrderSize self._OrderByMargin = OrderByMargin self._OrderMarginPercent = OrderMarginPercent self._PricePrecision = None self._AmountPrecision = None self._OneSizeInCurrentCoin = None self._QuarterOneSizeValue = None # Take profit and stop loss self._UseStopLoss = UseStopLoss self._StopLossPercent = StopLossPercent self._UseTakeProfit = UseTakeProfit self._TakeProfitPercent = TakeProfitPercent self._UseTrackingTakeProfit = UseTrackingTakeProfit self._UsePositionRetracement = UsePositionRetracement self._TakeProfitTriggerPercent = TakeProfitTriggerPercent self._CallBakcPercent = CallBakcPercent # Strategy variables self._LastBarTime = 0 self._TrendWhenTakeProfitOrStopLoss = 0 self._HadStopLoss = False self._TriggeredTakeProfit = False self._PeakPriceInPosition = 0 self._HadTakeProfit = False self._PriceCrossEMAStatus = 0 # Statistical variables self._InitAsset = 0 self._ProfitLocal = 0 self._TakeProfitCount = 0 self._TradeCount = 0 self.StrategyRunTimeStampString = "strategy_run_time" self._StrategyDatas = {"start_run_timestamp": 0, "others": ""} self._UserDatas = None # Relatively fixed parameters self._MaintenanceMarginRate = 0.004 self._TakerFee = 0.0005 self._IsUsdtStandard = False # Get contract information ticker = _C(exchange.GetTicker, self._UseContract) marketInfo = exchange.GetMarkets()[self._UseContract] Log('Get market information:', marketInfo) self._PricePrecision = marketInfo['PricePrecision'] self._AmountPrecision = marketInfo['AmountPrecision'] self._OneSizeInCurrentCoin = marketInfo['CtVal'] self._QuarterOneSizeValue = marketInfo['CtVal'] exchange.SetCurrency(self._Currency) exchange.SetMarginLevel(self._UseContract, self._MarginLevel) exchange.SetPrecision(self._PricePrecision, self._AmountPrecision) # Initialize data def initDatas(self): self.saveStrategyRunTime() self.readUserDataLocal() self._InitAsset = self._UserDatas["init_assets"] self._ProfitLocal = self._UserDatas["profit_local"] self._TakeProfitCount = self._UserDatas["take_profit_count"] self._TradeCount = self._UserDatas["trade_count"] if self._OrderByMargin: self.getRealOrderSize(-1, self._OrderSize) Log("Order quantity has been recalculated:", self._OrderSize) if self._UseTakeProfit and self._UseTrackingTakeProfit: raise Exception("Take profit and trailing take profit cannot be used simultaneously!") # Set contract def setContract(self): self._IsUsdtStandard = "USDT" in self._Currency exchange.SetCurrency(self._Currency) if self._UseQuarter: exchange.SetContractType("quarter") else: exchange.SetContractType("swap") # Save program start run time (second-level timestamp) def saveStrategyRunTime(self): local_data_strategy_run_time = _G(self.StrategyRunTimeStampString) if local_data_strategy_run_time is None: self._StrategyDatas["start_run_timestamp"] = Unix() _G(self.StrategyRunTimeStampString, self._StrategyDatas["start_run_timestamp"]) else: self._StrategyDatas["start_run_timestamp"] = local_data_strategy_run_time # Set program start run time (second-level timestamp) def setStrategyRunTime(self, timestamp): _G(self.StrategyRunTimeStampString, timestamp) self._StrategyDatas["start_run_timestamp"] = timestamp # Calculate days between two timestamps (parameters are second-level timestamps) def getDaysFromTimeStamp(self, start_time, end_time): if end_time < start_time: return 0 return (end_time - start_time) // (60 * 60 * 24) # Save data locally def saveUserDatasLocal(self): self._UserDatas = { "init_assets": self._InitAsset, "profit_local": self._ProfitLocal, "take_profit_count": self._TakeProfitCount, "trade_count": self._TradeCount } # Store locally _G(exchange.GetLabel(), self._UserDatas) Log("All data has been saved locally.") # Read user local data, run once when program starts def readUserDataLocal(self): user_data = _G(exchange.GetLabel()) if user_data is None: self._InitAsset = self.getAccountAsset(_C(exchange.GetPosition), _C(exchange.GetAccount), _C(exchange.GetTicker)) self._UserDatas = { "init_assets": self._InitAsset, "profit_local": 0, "take_profit_count": 0, "trade_count": 0 } else: self._UserDatas = user_data # Clear user local data, run when interactive button is clicked def clearUserDataLocal(self): _G(exchange.GetLabel(), None) Log(exchange.GetLabel(), ": Local data has been cleared.") # Strategy interaction def runCmd(self): cmd = GetCommand() if cmd: # Detect interactive commands Log("Received command:", cmd, "#FF1CAE") if cmd.startswith("ClearLocalData:"): # Clear local data self.clearUserDataLocal() elif cmd.startswith("SaveLocalData:"): # Save data locally self.saveUserDatasLocal() elif cmd.startswith("ClearLog:"): # Clear logs log_reserve = cmd.replace("ClearLog:", "") LogReset(int(log_reserve)) elif cmd.startswith("OrderSize:"): # Modify order quantity if self._OrderByMargin: Log("Order by margin is already enabled, cannot modify order quantity directly!") else: order_size = int(cmd.replace("OrderSize:", "")) self._OrderSize = order_size Log("Order quantity has been modified to:", self._OrderSize) elif cmd.startswith("OrderMarginPercent:"): # Modify order margin percentage if self._OrderByMargin: order_margin_percent = float(cmd.replace("OrderMarginPercent:", "")) self._OrderMarginPercent = order_margin_percent Log("Order margin percentage:", self._OrderMarginPercent, "%") else: Log("Order by margin is not enabled, cannot modify order margin percentage!") # Trading functions def orderDirectly(self, distance, price, amount): tradeFunc = None if amount <= 0: raise Exception("Parameter error, order quantity is already less than 0!") if distance == "buy": tradeFunc = exchange.Buy elif distance == "sell": tradeFunc = exchange.Sell elif distance == "closebuy": tradeFunc = exchange.Sell else: tradeFunc = exchange.Buy exchange.SetDirection(distance) return tradeFunc(price, amount) def openLong(self, price, amount): real_amount = self.getRealOrderSize(price, amount) return self.orderDirectly("buy", price, real_amount) def openShort(self, price, amount): real_amount = self.getRealOrderSize(price, amount) return self.orderDirectly("sell", price, real_amount) def coverLong(self, price, amount): return self.orderDirectly("closebuy", price, amount) def coverShort(self, price, amount): return self.orderDirectly("closesell", price, amount) # Recalculate order quantity def getRealOrderSize(self, price, amount): real_price = price if price != -1 else _C(exchange.GetTicker).Last if self._OrderByMargin: if self._IsUsdtStandard: self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) / real_price * self._MarginLevel / self._OneSizeInCurrentCoin, self._AmountPrecision) # USDT-margined quantity (leveraged quantity) else: self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) * self._MarginLevel * real_price / self._QuarterOneSizeValue, self._AmountPrecision) # Coin-margined quantity (leveraged quantity) else: self._OrderSize = amount return self._OrderSize # Get margin occupied by single position def getSinglePositionMargin(self, position, ticker): position_margin = 0 if len(position) > 0: if self._IsUsdtStandard: position_margin = position[0].Amount * self._OneSizeInCurrentCoin * ticker.Last / self._MarginLevel else: position_margin = position[0].Amount * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel return position_margin # Get profit and profit percentage of unidirectional position def getSinglePositionProfit(self, position, ticker): if len(position) == 0: return [0, 0] price = ticker.Last position_margin = self.getSinglePositionMargin(position, ticker) position_profit_percent = (price - position[0].Price) / position[0].Price * self._MarginLevel if position[0].Type == PD_LONG else (position[0].Price - price) / position[0].Price * self._MarginLevel position_profit = position_margin * position_profit_percent return [position_profit, position_profit_percent] # Calculate liquidation price def calculateForcedPrice(self, account, position, ticker): position_profit = 0 total_avail_balance = 0 forced_price = 0 position_margin = self.getSinglePositionMargin(position, ticker) [position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker) if self._IsUsdtStandard: total_avail_balance = account.Balance + position_margin + account.FrozenBalance - position_profit if position_profit > 0 else account.Balance + position_margin + account.FrozenBalance if position[0].Type == PD_LONG: forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin + (position[0].Amount * position[0].Price) / (position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount) else: forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin - (position[0].Amount * position[0].Price) / (-1 * position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount) else: total_avail_balance = account.Stocks + position_margin + account.FrozenStocks - position_profit if position_profit > 0 else account.Stocks + position_margin + account.FrozenStocks if position[0].Type == PD_LONG: forced_price = (self._MaintenanceMarginRate * position[0].Amount + position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue + position[0].Amount / position[0].Price) else: forced_price = (self._MaintenanceMarginRate * position[0].Amount - position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue - position[0].Amount / position[0].Price) if forced_price < 0: forced_price = 0 return forced_price # Calculate maximum order quantity def getMaxOrderSize(self, margin_level, ticker, account): max_order_size = 0 if self._IsUsdtStandard: max_order_size = account.Balance * margin_level / (self._OneSizeInCurrentCoin * ticker.Last) else: max_order_size = account.Stocks * ticker.Last / self._QuarterOneSizeValue * margin_level return _N(max_order_size, self._AmountPrecision) # Get account assets def getAccountAsset(self, position, account, ticker): # Calculate account initial assets under different situations account_asset = 0 position_margin = self.getSinglePositionMargin(position, ticker) if self._IsUsdtStandard: if len(position) > 0: account_asset = account.Balance + account.FrozenBalance + position_margin else: account_asset = account.Balance + account.FrozenBalance else: if len(position) > 0: account_asset = account.Stocks + account.FrozenStocks + position_margin else: account_asset = account.Stocks + account.FrozenStocks return account_asset # Profit statistics def calculateProfit(self, ticker): # Re-obtain account position and assets position = _C(exchange.GetPosition) account = _C(exchange.GetAccount) # Current total profit - previous total profit = current profit current_profit = (self.getAccountAsset(position, account, ticker) - self._InitAsset) - self._ProfitLocal self._ProfitLocal += current_profit if current_profit > 0: self._TakeProfitCount += 1 self._TradeCount += 1 LogProfit(_N(self._ProfitLocal, 4), " Current profit:", _N(current_profit, 6)) self.saveUserDatasLocal() # Check if there are enough funds to place order def isEnoughAssetToOrder(self, order_size, ticker): is_enough = True account = _C(exchange.GetAccount) if self._IsUsdtStandard: if account.Balance < order_size * ticker.Last * self._OneSizeInCurrentCoin / self._MarginLevel: is_enough = False else: if account.Stocks < order_size * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel: is_enough = False return is_enough # Run strategy core according to K-line period def runInKLinePeriod(self, records): bar_time = records[-1].Time if self._RunInKLinePeriod and self._LastBarTime == bar_time: return False self._LastBarTime = bar_time return True # Trend judgment module (editable specific indicators) def trendJudgment(self, records): # Check if price crosses EMA def checkPriceCrossEma(price, ema_value): if self._PriceCrossEMAStatus == 0: if price <= ema_value: self._PriceCrossEMAStatus = -1 else: self._PriceCrossEMAStatus = 1 elif (self._PriceCrossEMAStatus == -1 and price >= ema_value) or (self._PriceCrossEMAStatus == 1 and price <= ema_value): self._PriceCrossEMAStatus = 2 # Completed crossing # EMA long/short judgment ema_long = False ema_short = False price = records[-2].Close # Close price of already closed K-line ema = TA.EMA(records, self._EmaLength) ema_value = ema[-2] # EMA value corresponding to closed K-line ema_upper = ema_value * (1 + self._EmaCoefficient) ema_lower = ema_value * (1 - self._EmaCoefficient) checkPriceCrossEma(price, ema_value) if price > ema_upper: ema_long = True elif price < ema_lower: ema_short = True # Standard deviation judgment in_trend = False if self._UseStddev: records_data = [] for i in range(len(records)): records_data.append((records[i].High + records[i].Low) / 2 if self._UseRecordsMiddleValue else records[i].Close) records_data = np.array(records_data) # Convert list to np.array stddev = np.std(records_data, ddof=1) # Use numpy to calculate standard deviation if stddev > self._StddevDeviations: in_trend = True else: in_trend = True # Trend judgment long = in_trend and ema_long short = in_trend and ema_short if long: Log("Current trend is: Long", self._EnableMessageSend and "@" or "#00FF7F") elif short: Log("Current trend is: Short", self._EnableMessageSend and "@" or "#FF0000") else: Log("Current trend is: Sideways", self._EnableMessageSend and "@" or "#007FFF") return [long, short] # Stop loss def stopLoss(self, position, ticker): stop_loss_price = 0 price = ticker.Last if len(position) == 1 and self._UseStopLoss: if position[0].Type == PD_LONG: stop_loss_price = position[0].Price * (1 - self._StopLossPercent / 100) if price < stop_loss_price: self.coverLong(-1, position[0].Amount) self.calculateProfit(ticker) self._TrendWhenTakeProfitOrStopLoss = 1 self._HadStopLoss = True Log("Long position stop loss. Stop loss price:", _N(stop_loss_price, 6), ", Position price:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE") elif position[0].Type == PD_SHORT: stop_loss_price = position[0].Price * (1 + self._StopLossPercent / 100) if price > stop_loss_price: self.coverShort(-1, position[0].Amount) self.calculateProfit(ticker) self._TrendWhenTakeProfitOrStopLoss = -1 self._HadStopLoss = True Log("Short position stop loss. Stop loss price:", _N(stop_loss_price, 6), ", Position price:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE") # Take profit def takeProfit(self, position, ticker): take_profit_price = 0 price = ticker.Last if len(position) == 1 and self._UseTakeProfit: if position[0].Type == PD_LONG: take_profit_price = position[0].Price * (1 + self._TakeProfitPercent / 100) if price > take_profit_price: self.coverLong(-1, position[0].Amount) self.calculateProfit(ticker) self._TrendWhenTakeProfitOrStopLoss = 1 self._HadTakeProfit = True Log("Long position take profit. Take profit price:", _N(take_profit_price, 6), ", Position price:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE") elif position[0].Type == PD_SHORT: take_profit_price = position[0].Price * (1 - self._TakeProfitPercent / 100) if price < take_profit_price: self.coverShort(-1, position[0].Amount) self.calculateProfit(ticker) self._TrendWhenTakeProfitOrStopLoss = -1 self._HadTakeProfit = True Log("Short position take profit. Take profit price:", _N(take_profit_price, 6), ", Position price:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE") # Trailing take profit def trackingTakeProfit(self, position, ticker): take_profit_price = 0 trigger_price = 0 price = ticker.Last if len(position) > 0 and self._UseTrackingTakeProfit: if position[0].Type == PD_LONG: # Long position holding if self._TriggeredTakeProfit: # Trigger price reached, monitor for take profit self._PeakPriceInPosition = price if price > self._PeakPriceInPosition else self._PeakPriceInPosition # Update price peak if self._UsePositionRetracement: take_profit_price = self._PeakPriceInPosition - (self._PeakPriceInPosition - position[0].Price) * (self._CallBakcPercent / 100) # Calculate retracement take profit price else: take_profit_price = self._PeakPriceInPosition * (1 - self._CallBakcPercent / 100) # Calculate retracement take profit price if price < take_profit_price: self.coverLong(-1, position[0].Amount) # Close long self.calculateProfit(ticker) self._TriggeredTakeProfit = False # Reset trigger flag self._TrendWhenTakeProfitOrStopLoss = 1 # Record trend when taking profit self._HadTakeProfit = True # Record that take profit occurred Log("Long position trailing take profit: Position price peak:", _N(self._PeakPriceInPosition, 6), ", Take profit price:", _N(take_profit_price, 6), ", Current price:", _N(price, 6), ", Position price:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE") else: # Monitor if trailing take profit trigger price is reached trigger_price = position[0].Price * (1 + self._TakeProfitTriggerPercent / 100) if price > trigger_price: self._TriggeredTakeProfit = True # Trigger trailing take profit self._PeakPriceInPosition = price # Record price peak Log("Long position reached trailing take profit trigger price:", _N(trigger_price, 6), ", Current price:", _N(price, 6), ", Position price:", _N(position[0].Price, 6)) elif position[0].Type == PD_SHORT: # Short position holding if self._TriggeredTakeProfit: # Trigger price reached, monitor for take profit self._PeakPriceInPosition = price if price < self._PeakPriceInPosition else self._PeakPriceInPosition # Update price low if self._UsePositionRetracement: take_profit_price = self._PeakPriceInPosition + (position[0].Price - self._PeakPriceInPosition) * (self._CallBakcPercent / 100) # Calculate retracement take profit price else: take_profit_price = self._PeakPriceInPosition * (1 + self._CallBakcPercent / 100) # Calculate retracement take profit price if price > take_profit_price: self.coverShort(-1, position[0].Amount) # Close short self.calculateProfit(ticker) self._TriggeredTakeProfit = False # Reset trigger flag self._TrendWhenTakeProfitOrStopLoss = -1 # Record trend when taking profit self._HadTakeProfit = True # Record that take profit occurred Log("Short position trailing take profit: Position price low:", _N(self._PeakPriceInPosition, 6), ", Take profit price:", _N(take_profit_price, 6), ", Current price:", _N(price, 6), ", Position price:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE") else: # Monitor if trailing take profit trigger price is reached trigger_price = position[0].Price * (1 - self._TakeProfitTriggerPercent / 100) if price < trigger_price: self._TriggeredTakeProfit = True # Trigger trailing take profit self._PeakPriceInPosition = price # Record price low Log("Short position reached trailing take profit trigger price:", _N(trigger_price, 6), ", Current price:", _N(price, 6), ", Position price:", _N(position[0].Price, 6)) # Place order def order(self, long, short, position, ticker): position_size = position[0].Amount if len(position) > 0 else 0 position_type = position[0].Type if len(position) > 0 else None if long: # Long trend if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == 1: # Stop loss or take profit occurred, and trend was long when it happened, do not go long again return if position_size > 0 and position_type == PD_SHORT: self.coverShort(-1, position_size) self.calculateProfit(ticker) elif position_size > 0 and position_type == PD_LONG: # Long position holding, do not place duplicate orders return else: # No position, if first run or strategy restart, need to wait for price to cross EMA once before placing order if self._PriceCrossEMAStatus != 2: return if self.isEnoughAssetToOrder(self._OrderSize, ticker): self.openLong(-1, self._OrderSize) self._HadStopLoss = False self._HadTakeProfit = False else: raise Exception("Insufficient order amount!") elif short: # Short trend if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == -1: # Stop loss or take profit occurred, and trend was short when it happened, do not go short again return if position_size > 0 and position_type == PD_LONG: self.coverLong(-1, position_size) self.calculateProfit(ticker) elif position_size > 0 and position_type == PD_SHORT: # Short position holding, do not place duplicate orders return else: # No position, if first run or strategy restart, need to wait for price to cross EMA once before placing order if self._PriceCrossEMAStatus != 2: return if self.isEnoughAssetToOrder(self._OrderSize, ticker): self.openShort(-1, self._OrderSize) self._HadStopLoss = False self._HadTakeProfit = False else: raise Exception("Insufficient order amount!") # Trend strategy def trendStrategy(self): ticker = _C(exchange.GetTicker) position = _C(exchange.GetPosition) account = _C(exchange.GetAccount) records = _C(exchange.GetRecords, self._KLinePeriod * 60) if len(position) > 1: Log(position) raise Exception("Simultaneous long and short positions!") # Strategy interaction self.runCmd() # Status bar information printing self.printLogStatus(ticker, account, position) # Stop loss self.stopLoss(position, ticker) # Take profit self.takeProfit(position, ticker) # Trailing take profit self.trackingTakeProfit(position, ticker) # Run strategy according to K-line period if not self.runInKLinePeriod(records): return # Trend judgment and order placement long = False short = False [long, short] = self.trendJudgment(records) if not self._OnlyTrendJudgment: self.order(long, short, position, ticker) # Status bar information printing def printLogStatus(self, ticker, account, position): table_overview = { "type": "table", "title": "Strategy Overview", "cols": ["Start Time", "Running Days", "Trade Count", "Win Rate", "Est. Monthly %", "Est. Annual %"], "rows": [] } table_account = { "type": "table", "title": "Account Funds", "cols": ["Current Asset", "Initial Asset", "Available Balance", "Frozen Balance", "Max Order Size", "Profit", "Profit %"], "rows": [] } table_position = { "type": "table", "title": "Position Info", "cols": ["Symbol", "Leverage", "Avg Entry Price", "Direction", "Size", "Margin", "Est. Liq. Price", "Unrealized PnL", "Unrealized PnL %"], "rows": [] } i = 0 # Strategy Overview the_running_days = self.getDaysFromTimeStamp(self._StrategyDatas["start_run_timestamp"], Unix()) monthly_rate_of_profit = 0 if the_running_days > 1: monthly_rate_of_profit = self._ProfitLocal / self._InitAsset / the_running_days * 30 table_overview["rows"].append([_D(self._StrategyDatas["start_run_timestamp"]), the_running_days, self._TradeCount, 0 if self._TradeCount == 0 else (str(_N(self._TakeProfitCount / self._TradeCount * 100, 2)) + "%"), str(_N(monthly_rate_of_profit * 100, 2)) + "%", str(_N(monthly_rate_of_profit * 12 * 100, 2)) + "%"]) # Account Funds current_asset = self.getAccountAsset(position, account, ticker) max_order_size = self.getMaxOrderSize(self._MarginLevel, ticker, account) asset_profit = current_asset - self._InitAsset asset_profit_percent = asset_profit / self._InitAsset table_account["rows"].append([_N(current_asset, 4), _N(self._InitAsset, 4), _N(account.Balance if self._IsUsdtStandard else account.Stocks, 4), _N(account.FrozenBalance if self._IsUsdtStandard else account.FrozenStocks, 4), max_order_size, _N(asset_profit, 4), str(_N(asset_profit_percent * 100, 2)) + "%"]) # Position Info position_direction = "" forced_cover_up_price = 0 position_profit_percent = 0 position_profit = 0 position_margin = 0 if len(position) == 0: table_position["rows"].append(["No Position", "-", "-", "-", "-", "-", "-", "-", "-"]) else: position_direction = "Long" if position[0].Type == PD_LONG else "Short" [position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker) position_margin = self.getSinglePositionMargin(position, ticker) forced_cover_up_price = self.calculateForcedPrice(account, position, ticker) table_position["rows"].append([exchange.GetCurrency(), self._MarginLevel, _N(position[0].Price, 4), position_direction, position[0].Amount, _N(position_margin, 4), _N(forced_cover_up_price, 4), _N(position_profit, 4), str(_N((position_profit_percent * 100), 2)) + "%"]) # Print tables LogStatus('`' + json.dumps(table_overview) + '`\n' + '`' + json.dumps(table_account) + '`\n' + '`' + json.dumps(table_position) + '`\n') # main def main(): exchange.IO('simulate', True) strategy = TrendStrategy() strategy.setContract() strategy.initDatas() while True: strategy.trendStrategy() Sleep(strategy._Interval) COMMAND_BLOCK: '''backtest start: 2024-11-26 00:00:00 end: 2024-12-03 00:00:00 period: 1d basePeriod: 1d exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT"}] ''' import json, talib import numpy as np class TrendStrategy: def __init__(self): # Basic settings self._Currency = TradeCurrency self._Interval = Interval self._UseQuarter = UseQuarter self._UseContract = TradeCurrency + ('.swap' if self._UseQuarter else '.quarter') self._OnlyTrendJudgment = OnlyTrendJudgment self._EnableMessageSend = EnableMessageSend # Trend judgment self._RunInKLinePeriod = RunInKLinePeriod self._KLinePeriod = KLinePeriod self._EmaLength = EmaLength self._EmaCoefficient = EmaCoefficient self._UseStddev = UseStddev self._UseRecordsMiddleValue = UseRecordsMiddleValue self._StddevLength = StddevLength self._StddevDeviations = StddevDeviations # Order settings self._MarginLevel = MarginLevel self._OrderSize = OrderSize self._OrderByMargin = OrderByMargin self._OrderMarginPercent = OrderMarginPercent self._PricePrecision = None self._AmountPrecision = None self._OneSizeInCurrentCoin = None self._QuarterOneSizeValue = None # Take profit and stop loss self._UseStopLoss = UseStopLoss self._StopLossPercent = StopLossPercent self._UseTakeProfit = UseTakeProfit self._TakeProfitPercent = TakeProfitPercent self._UseTrackingTakeProfit = UseTrackingTakeProfit self._UsePositionRetracement = UsePositionRetracement self._TakeProfitTriggerPercent = TakeProfitTriggerPercent self._CallBakcPercent = CallBakcPercent # Strategy variables self._LastBarTime = 0 self._TrendWhenTakeProfitOrStopLoss = 0 self._HadStopLoss = False self._TriggeredTakeProfit = False self._PeakPriceInPosition = 0 self._HadTakeProfit = False self._PriceCrossEMAStatus = 0 # Statistical variables self._InitAsset = 0 self._ProfitLocal = 0 self._TakeProfitCount = 0 self._TradeCount = 0 self.StrategyRunTimeStampString = "strategy_run_time" self._StrategyDatas = {"start_run_timestamp": 0, "others": ""} self._UserDatas = None # Relatively fixed parameters self._MaintenanceMarginRate = 0.004 self._TakerFee = 0.0005 self._IsUsdtStandard = False # Get contract information ticker = _C(exchange.GetTicker, self._UseContract) marketInfo = exchange.GetMarkets()[self._UseContract] Log('Get market information:', marketInfo) self._PricePrecision = marketInfo['PricePrecision'] self._AmountPrecision = marketInfo['AmountPrecision'] self._OneSizeInCurrentCoin = marketInfo['CtVal'] self._QuarterOneSizeValue = marketInfo['CtVal'] exchange.SetCurrency(self._Currency) exchange.SetMarginLevel(self._UseContract, self._MarginLevel) exchange.SetPrecision(self._PricePrecision, self._AmountPrecision) # Initialize data def initDatas(self): self.saveStrategyRunTime() self.readUserDataLocal() self._InitAsset = self._UserDatas["init_assets"] self._ProfitLocal = self._UserDatas["profit_local"] self._TakeProfitCount = self._UserDatas["take_profit_count"] self._TradeCount = self._UserDatas["trade_count"] if self._OrderByMargin: self.getRealOrderSize(-1, self._OrderSize) Log("Order quantity has been recalculated:", self._OrderSize) if self._UseTakeProfit and self._UseTrackingTakeProfit: raise Exception("Take profit and trailing take profit cannot be used simultaneously!") # Set contract def setContract(self): self._IsUsdtStandard = "USDT" in self._Currency exchange.SetCurrency(self._Currency) if self._UseQuarter: exchange.SetContractType("quarter") else: exchange.SetContractType("swap") # Save program start run time (second-level timestamp) def saveStrategyRunTime(self): local_data_strategy_run_time = _G(self.StrategyRunTimeStampString) if local_data_strategy_run_time is None: self._StrategyDatas["start_run_timestamp"] = Unix() _G(self.StrategyRunTimeStampString, self._StrategyDatas["start_run_timestamp"]) else: self._StrategyDatas["start_run_timestamp"] = local_data_strategy_run_time # Set program start run time (second-level timestamp) def setStrategyRunTime(self, timestamp): _G(self.StrategyRunTimeStampString, timestamp) self._StrategyDatas["start_run_timestamp"] = timestamp # Calculate days between two timestamps (parameters are second-level timestamps) def getDaysFromTimeStamp(self, start_time, end_time): if end_time < start_time: return 0 return (end_time - start_time) // (60 * 60 * 24) # Save data locally def saveUserDatasLocal(self): self._UserDatas = { "init_assets": self._InitAsset, "profit_local": self._ProfitLocal, "take_profit_count": self._TakeProfitCount, "trade_count": self._TradeCount } # Store locally _G(exchange.GetLabel(), self._UserDatas) Log("All data has been saved locally.") # Read user local data, run once when program starts def readUserDataLocal(self): user_data = _G(exchange.GetLabel()) if user_data is None: self._InitAsset = self.getAccountAsset(_C(exchange.GetPosition), _C(exchange.GetAccount), _C(exchange.GetTicker)) self._UserDatas = { "init_assets": self._InitAsset, "profit_local": 0, "take_profit_count": 0, "trade_count": 0 } else: self._UserDatas = user_data # Clear user local data, run when interactive button is clicked def clearUserDataLocal(self): _G(exchange.GetLabel(), None) Log(exchange.GetLabel(), ": Local data has been cleared.") # Strategy interaction def runCmd(self): cmd = GetCommand() if cmd: # Detect interactive commands Log("Received command:", cmd, "#FF1CAE") if cmd.startswith("ClearLocalData:"): # Clear local data self.clearUserDataLocal() elif cmd.startswith("SaveLocalData:"): # Save data locally self.saveUserDatasLocal() elif cmd.startswith("ClearLog:"): # Clear logs log_reserve = cmd.replace("ClearLog:", "") LogReset(int(log_reserve)) elif cmd.startswith("OrderSize:"): # Modify order quantity if self._OrderByMargin: Log("Order by margin is already enabled, cannot modify order quantity directly!") else: order_size = int(cmd.replace("OrderSize:", "")) self._OrderSize = order_size Log("Order quantity has been modified to:", self._OrderSize) elif cmd.startswith("OrderMarginPercent:"): # Modify order margin percentage if self._OrderByMargin: order_margin_percent = float(cmd.replace("OrderMarginPercent:", "")) self._OrderMarginPercent = order_margin_percent Log("Order margin percentage:", self._OrderMarginPercent, "%") else: Log("Order by margin is not enabled, cannot modify order margin percentage!") # Trading functions def orderDirectly(self, distance, price, amount): tradeFunc = None if amount <= 0: raise Exception("Parameter error, order quantity is already less than 0!") if distance == "buy": tradeFunc = exchange.Buy elif distance == "sell": tradeFunc = exchange.Sell elif distance == "closebuy": tradeFunc = exchange.Sell else: tradeFunc = exchange.Buy exchange.SetDirection(distance) return tradeFunc(price, amount) def openLong(self, price, amount): real_amount = self.getRealOrderSize(price, amount) return self.orderDirectly("buy", price, real_amount) def openShort(self, price, amount): real_amount = self.getRealOrderSize(price, amount) return self.orderDirectly("sell", price, real_amount) def coverLong(self, price, amount): return self.orderDirectly("closebuy", price, amount) def coverShort(self, price, amount): return self.orderDirectly("closesell", price, amount) # Recalculate order quantity def getRealOrderSize(self, price, amount): real_price = price if price != -1 else _C(exchange.GetTicker).Last if self._OrderByMargin: if self._IsUsdtStandard: self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) / real_price * self._MarginLevel / self._OneSizeInCurrentCoin, self._AmountPrecision) # USDT-margined quantity (leveraged quantity) else: self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) * self._MarginLevel * real_price / self._QuarterOneSizeValue, self._AmountPrecision) # Coin-margined quantity (leveraged quantity) else: self._OrderSize = amount return self._OrderSize # Get margin occupied by single position def getSinglePositionMargin(self, position, ticker): position_margin = 0 if len(position) > 0: if self._IsUsdtStandard: position_margin = position[0].Amount * self._OneSizeInCurrentCoin * ticker.Last / self._MarginLevel else: position_margin = position[0].Amount * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel return position_margin # Get profit and profit percentage of unidirectional position def getSinglePositionProfit(self, position, ticker): if len(position) == 0: return [0, 0] price = ticker.Last position_margin = self.getSinglePositionMargin(position, ticker) position_profit_percent = (price - position[0].Price) / position[0].Price * self._MarginLevel if position[0].Type == PD_LONG else (position[0].Price - price) / position[0].Price * self._MarginLevel position_profit = position_margin * position_profit_percent return [position_profit, position_profit_percent] # Calculate liquidation price def calculateForcedPrice(self, account, position, ticker): position_profit = 0 total_avail_balance = 0 forced_price = 0 position_margin = self.getSinglePositionMargin(position, ticker) [position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker) if self._IsUsdtStandard: total_avail_balance = account.Balance + position_margin + account.FrozenBalance - position_profit if position_profit > 0 else account.Balance + position_margin + account.FrozenBalance if position[0].Type == PD_LONG: forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin + (position[0].Amount * position[0].Price) / (position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount) else: forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin - (position[0].Amount * position[0].Price) / (-1 * position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount) else: total_avail_balance = account.Stocks + position_margin + account.FrozenStocks - position_profit if position_profit > 0 else account.Stocks + position_margin + account.FrozenStocks if position[0].Type == PD_LONG: forced_price = (self._MaintenanceMarginRate * position[0].Amount + position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue + position[0].Amount / position[0].Price) else: forced_price = (self._MaintenanceMarginRate * position[0].Amount - position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue - position[0].Amount / position[0].Price) if forced_price < 0: forced_price = 0 return forced_price # Calculate maximum order quantity def getMaxOrderSize(self, margin_level, ticker, account): max_order_size = 0 if self._IsUsdtStandard: max_order_size = account.Balance * margin_level / (self._OneSizeInCurrentCoin * ticker.Last) else: max_order_size = account.Stocks * ticker.Last / self._QuarterOneSizeValue * margin_level return _N(max_order_size, self._AmountPrecision) # Get account assets def getAccountAsset(self, position, account, ticker): # Calculate account initial assets under different situations account_asset = 0 position_margin = self.getSinglePositionMargin(position, ticker) if self._IsUsdtStandard: if len(position) > 0: account_asset = account.Balance + account.FrozenBalance + position_margin else: account_asset = account.Balance + account.FrozenBalance else: if len(position) > 0: account_asset = account.Stocks + account.FrozenStocks + position_margin else: account_asset = account.Stocks + account.FrozenStocks return account_asset # Profit statistics def calculateProfit(self, ticker): # Re-obtain account position and assets position = _C(exchange.GetPosition) account = _C(exchange.GetAccount) # Current total profit - previous total profit = current profit current_profit = (self.getAccountAsset(position, account, ticker) - self._InitAsset) - self._ProfitLocal self._ProfitLocal += current_profit if current_profit > 0: self._TakeProfitCount += 1 self._TradeCount += 1 LogProfit(_N(self._ProfitLocal, 4), " Current profit:", _N(current_profit, 6)) self.saveUserDatasLocal() # Check if there are enough funds to place order def isEnoughAssetToOrder(self, order_size, ticker): is_enough = True account = _C(exchange.GetAccount) if self._IsUsdtStandard: if account.Balance < order_size * ticker.Last * self._OneSizeInCurrentCoin / self._MarginLevel: is_enough = False else: if account.Stocks < order_size * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel: is_enough = False return is_enough # Run strategy core according to K-line period def runInKLinePeriod(self, records): bar_time = records[-1].Time if self._RunInKLinePeriod and self._LastBarTime == bar_time: return False self._LastBarTime = bar_time return True # Trend judgment module (editable specific indicators) def trendJudgment(self, records): # Check if price crosses EMA def checkPriceCrossEma(price, ema_value): if self._PriceCrossEMAStatus == 0: if price <= ema_value: self._PriceCrossEMAStatus = -1 else: self._PriceCrossEMAStatus = 1 elif (self._PriceCrossEMAStatus == -1 and price >= ema_value) or (self._PriceCrossEMAStatus == 1 and price <= ema_value): self._PriceCrossEMAStatus = 2 # Completed crossing # EMA long/short judgment ema_long = False ema_short = False price = records[-2].Close # Close price of already closed K-line ema = TA.EMA(records, self._EmaLength) ema_value = ema[-2] # EMA value corresponding to closed K-line ema_upper = ema_value * (1 + self._EmaCoefficient) ema_lower = ema_value * (1 - self._EmaCoefficient) checkPriceCrossEma(price, ema_value) if price > ema_upper: ema_long = True elif price < ema_lower: ema_short = True # Standard deviation judgment in_trend = False if self._UseStddev: records_data = [] for i in range(len(records)): records_data.append((records[i].High + records[i].Low) / 2 if self._UseRecordsMiddleValue else records[i].Close) records_data = np.array(records_data) # Convert list to np.array stddev = np.std(records_data, ddof=1) # Use numpy to calculate standard deviation if stddev > self._StddevDeviations: in_trend = True else: in_trend = True # Trend judgment long = in_trend and ema_long short = in_trend and ema_short if long: Log("Current trend is: Long", self._EnableMessageSend and "@" or "#00FF7F") elif short: Log("Current trend is: Short", self._EnableMessageSend and "@" or "#FF0000") else: Log("Current trend is: Sideways", self._EnableMessageSend and "@" or "#007FFF") return [long, short] # Stop loss def stopLoss(self, position, ticker): stop_loss_price = 0 price = ticker.Last if len(position) == 1 and self._UseStopLoss: if position[0].Type == PD_LONG: stop_loss_price = position[0].Price * (1 - self._StopLossPercent / 100) if price < stop_loss_price: self.coverLong(-1, position[0].Amount) self.calculateProfit(ticker) self._TrendWhenTakeProfitOrStopLoss = 1 self._HadStopLoss = True Log("Long position stop loss. Stop loss price:", _N(stop_loss_price, 6), ", Position price:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE") elif position[0].Type == PD_SHORT: stop_loss_price = position[0].Price * (1 + self._StopLossPercent / 100) if price > stop_loss_price: self.coverShort(-1, position[0].Amount) self.calculateProfit(ticker) self._TrendWhenTakeProfitOrStopLoss = -1 self._HadStopLoss = True Log("Short position stop loss. Stop loss price:", _N(stop_loss_price, 6), ", Position price:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE") # Take profit def takeProfit(self, position, ticker): take_profit_price = 0 price = ticker.Last if len(position) == 1 and self._UseTakeProfit: if position[0].Type == PD_LONG: take_profit_price = position[0].Price * (1 + self._TakeProfitPercent / 100) if price > take_profit_price: self.coverLong(-1, position[0].Amount) self.calculateProfit(ticker) self._TrendWhenTakeProfitOrStopLoss = 1 self._HadTakeProfit = True Log("Long position take profit. Take profit price:", _N(take_profit_price, 6), ", Position price:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE") elif position[0].Type == PD_SHORT: take_profit_price = position[0].Price * (1 - self._TakeProfitPercent / 100) if price < take_profit_price: self.coverShort(-1, position[0].Amount) self.calculateProfit(ticker) self._TrendWhenTakeProfitOrStopLoss = -1 self._HadTakeProfit = True Log("Short position take profit. Take profit price:", _N(take_profit_price, 6), ", Position price:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE") # Trailing take profit def trackingTakeProfit(self, position, ticker): take_profit_price = 0 trigger_price = 0 price = ticker.Last if len(position) > 0 and self._UseTrackingTakeProfit: if position[0].Type == PD_LONG: # Long position holding if self._TriggeredTakeProfit: # Trigger price reached, monitor for take profit self._PeakPriceInPosition = price if price > self._PeakPriceInPosition else self._PeakPriceInPosition # Update price peak if self._UsePositionRetracement: take_profit_price = self._PeakPriceInPosition - (self._PeakPriceInPosition - position[0].Price) * (self._CallBakcPercent / 100) # Calculate retracement take profit price else: take_profit_price = self._PeakPriceInPosition * (1 - self._CallBakcPercent / 100) # Calculate retracement take profit price if price < take_profit_price: self.coverLong(-1, position[0].Amount) # Close long self.calculateProfit(ticker) self._TriggeredTakeProfit = False # Reset trigger flag self._TrendWhenTakeProfitOrStopLoss = 1 # Record trend when taking profit self._HadTakeProfit = True # Record that take profit occurred Log("Long position trailing take profit: Position price peak:", _N(self._PeakPriceInPosition, 6), ", Take profit price:", _N(take_profit_price, 6), ", Current price:", _N(price, 6), ", Position price:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE") else: # Monitor if trailing take profit trigger price is reached trigger_price = position[0].Price * (1 + self._TakeProfitTriggerPercent / 100) if price > trigger_price: self._TriggeredTakeProfit = True # Trigger trailing take profit self._PeakPriceInPosition = price # Record price peak Log("Long position reached trailing take profit trigger price:", _N(trigger_price, 6), ", Current price:", _N(price, 6), ", Position price:", _N(position[0].Price, 6)) elif position[0].Type == PD_SHORT: # Short position holding if self._TriggeredTakeProfit: # Trigger price reached, monitor for take profit self._PeakPriceInPosition = price if price < self._PeakPriceInPosition else self._PeakPriceInPosition # Update price low if self._UsePositionRetracement: take_profit_price = self._PeakPriceInPosition + (position[0].Price - self._PeakPriceInPosition) * (self._CallBakcPercent / 100) # Calculate retracement take profit price else: take_profit_price = self._PeakPriceInPosition * (1 + self._CallBakcPercent / 100) # Calculate retracement take profit price if price > take_profit_price: self.coverShort(-1, position[0].Amount) # Close short self.calculateProfit(ticker) self._TriggeredTakeProfit = False # Reset trigger flag self._TrendWhenTakeProfitOrStopLoss = -1 # Record trend when taking profit self._HadTakeProfit = True # Record that take profit occurred Log("Short position trailing take profit: Position price low:", _N(self._PeakPriceInPosition, 6), ", Take profit price:", _N(take_profit_price, 6), ", Current price:", _N(price, 6), ", Position price:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE") else: # Monitor if trailing take profit trigger price is reached trigger_price = position[0].Price * (1 - self._TakeProfitTriggerPercent / 100) if price < trigger_price: self._TriggeredTakeProfit = True # Trigger trailing take profit self._PeakPriceInPosition = price # Record price low Log("Short position reached trailing take profit trigger price:", _N(trigger_price, 6), ", Current price:", _N(price, 6), ", Position price:", _N(position[0].Price, 6)) # Place order def order(self, long, short, position, ticker): position_size = position[0].Amount if len(position) > 0 else 0 position_type = position[0].Type if len(position) > 0 else None if long: # Long trend if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == 1: # Stop loss or take profit occurred, and trend was long when it happened, do not go long again return if position_size > 0 and position_type == PD_SHORT: self.coverShort(-1, position_size) self.calculateProfit(ticker) elif position_size > 0 and position_type == PD_LONG: # Long position holding, do not place duplicate orders return else: # No position, if first run or strategy restart, need to wait for price to cross EMA once before placing order if self._PriceCrossEMAStatus != 2: return if self.isEnoughAssetToOrder(self._OrderSize, ticker): self.openLong(-1, self._OrderSize) self._HadStopLoss = False self._HadTakeProfit = False else: raise Exception("Insufficient order amount!") elif short: # Short trend if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == -1: # Stop loss or take profit occurred, and trend was short when it happened, do not go short again return if position_size > 0 and position_type == PD_LONG: self.coverLong(-1, position_size) self.calculateProfit(ticker) elif position_size > 0 and position_type == PD_SHORT: # Short position holding, do not place duplicate orders return else: # No position, if first run or strategy restart, need to wait for price to cross EMA once before placing order if self._PriceCrossEMAStatus != 2: return if self.isEnoughAssetToOrder(self._OrderSize, ticker): self.openShort(-1, self._OrderSize) self._HadStopLoss = False self._HadTakeProfit = False else: raise Exception("Insufficient order amount!") # Trend strategy def trendStrategy(self): ticker = _C(exchange.GetTicker) position = _C(exchange.GetPosition) account = _C(exchange.GetAccount) records = _C(exchange.GetRecords, self._KLinePeriod * 60) if len(position) > 1: Log(position) raise Exception("Simultaneous long and short positions!") # Strategy interaction self.runCmd() # Status bar information printing self.printLogStatus(ticker, account, position) # Stop loss self.stopLoss(position, ticker) # Take profit self.takeProfit(position, ticker) # Trailing take profit self.trackingTakeProfit(position, ticker) # Run strategy according to K-line period if not self.runInKLinePeriod(records): return # Trend judgment and order placement long = False short = False [long, short] = self.trendJudgment(records) if not self._OnlyTrendJudgment: self.order(long, short, position, ticker) # Status bar information printing def printLogStatus(self, ticker, account, position): table_overview = { "type": "table", "title": "Strategy Overview", "cols": ["Start Time", "Running Days", "Trade Count", "Win Rate", "Est. Monthly %", "Est. Annual %"], "rows": [] } table_account = { "type": "table", "title": "Account Funds", "cols": ["Current Asset", "Initial Asset", "Available Balance", "Frozen Balance", "Max Order Size", "Profit", "Profit %"], "rows": [] } table_position = { "type": "table", "title": "Position Info", "cols": ["Symbol", "Leverage", "Avg Entry Price", "Direction", "Size", "Margin", "Est. Liq. Price", "Unrealized PnL", "Unrealized PnL %"], "rows": [] } i = 0 # Strategy Overview the_running_days = self.getDaysFromTimeStamp(self._StrategyDatas["start_run_timestamp"], Unix()) monthly_rate_of_profit = 0 if the_running_days > 1: monthly_rate_of_profit = self._ProfitLocal / self._InitAsset / the_running_days * 30 table_overview["rows"].append([_D(self._StrategyDatas["start_run_timestamp"]), the_running_days, self._TradeCount, 0 if self._TradeCount == 0 else (str(_N(self._TakeProfitCount / self._TradeCount * 100, 2)) + "%"), str(_N(monthly_rate_of_profit * 100, 2)) + "%", str(_N(monthly_rate_of_profit * 12 * 100, 2)) + "%"]) # Account Funds current_asset = self.getAccountAsset(position, account, ticker) max_order_size = self.getMaxOrderSize(self._MarginLevel, ticker, account) asset_profit = current_asset - self._InitAsset asset_profit_percent = asset_profit / self._InitAsset table_account["rows"].append([_N(current_asset, 4), _N(self._InitAsset, 4), _N(account.Balance if self._IsUsdtStandard else account.Stocks, 4), _N(account.FrozenBalance if self._IsUsdtStandard else account.FrozenStocks, 4), max_order_size, _N(asset_profit, 4), str(_N(asset_profit_percent * 100, 2)) + "%"]) # Position Info position_direction = "" forced_cover_up_price = 0 position_profit_percent = 0 position_profit = 0 position_margin = 0 if len(position) == 0: table_position["rows"].append(["No Position", "-", "-", "-", "-", "-", "-", "-", "-"]) else: position_direction = "Long" if position[0].Type == PD_LONG else "Short" [position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker) position_margin = self.getSinglePositionMargin(position, ticker) forced_cover_up_price = self.calculateForcedPrice(account, position, ticker) table_position["rows"].append([exchange.GetCurrency(), self._MarginLevel, _N(position[0].Price, 4), position_direction, position[0].Amount, _N(position_margin, 4), _N(forced_cover_up_price, 4), _N(position_profit, 4), str(_N((position_profit_percent * 100), 2)) + "%"]) # Print tables LogStatus('`' + json.dumps(table_overview) + '`\n' + '`' + json.dumps(table_account) + '`\n' + '`' + json.dumps(table_position) + '`\n') # main def main(): exchange.IO('simulate', True) strategy = TrendStrategy() strategy.setContract() strategy.initDatas() while True: strategy.trendStrategy() Sleep(strategy._Interval) - Basic configuration: Set trading currency, contract type, take-profit and stop-loss rules, etc. - Market information: Obtain contract price precision, quantity precision, etc., ensuring order legality. - Variable initialization: Including trend judgment, take-profit and stop-loss parameters, statistical variables, etc., helping the strategy make decisions based on market conditions. - Exchange setup: Configure exchange API interfaces based on market information, such as setting margin, precision, etc. - Save strategy runtime. - Read local user data. - Initialize account assets, profit statistics, and other data. - Check if take-profit and trailing take-profit are enabled simultaneously. - Check if runtime is already saved locally. - If not saved, record current time and save locally. - If already saved, read the locally saved time. - Use platform's _G function to save the input timestamp locally. - Update the start runtime in strategy data. - Check if end time is earlier than start time; if so, return 0. - Calculate the difference in seconds between two timestamps and convert to days. - Return the day difference. - Package account assets, profit statistics, and other data. - Use platform's _G function to save data locally. - Check if there is saved data locally. - If not, initialize data and save locally. - If yes, read and load into strategy. - Use platform's _G function to clear local data. - Record the clearing operation. - Get user-sent commands. - Execute corresponding operations based on command type, such as clearing local data, modifying order quantities, etc. - Record command execution results. - Select trading function (buy or sell) based on direction. - Set trading direction. - Execute order placement and return results. - Calculate actual order quantity. - Call orderDirectly function to execute buy operation. - Calculate actual order quantity. - Call orderDirectly function to execute sell operation. - Call orderDirectly function to execute sell operation. - Call orderDirectly function to execute buy operation. - Calculate actual order quantity based on whether ordering by margin ratio. - Return calculated order quantity. - Calculate margin based on position direction and quantity. - Return calculation result. - Calculate profit based on position direction and current price. - Return profit and profit rate. - Calculate liquidation price based on position direction and account balance. - Return calculation result. - Calculate maximum order quantity based on account balance and leverage. - Return calculation result. - Calculate total assets based on positions and account balance. - Return calculation result. - Calculate the difference between current total profit and initial assets. - Record profit and update statistical variables. - Save profit data locally. - Get account balance information. - Calculate required funds based on trading currency type (USDT-margined or coin-margined). - Check if account balance meets order requirements. - Return boolean value indicating if funds are sufficient. - Check if current K-line has been processed. - If not processed, mark as processed and return True; otherwise return False. - RSI (Relative Strength Index): Judge overbought/oversold conditions - MACD (Moving Average Convergence Divergence): Identify trend reversal points - Bollinger Bands: Trend judgment based on price volatility - KDJ Indicator: Comprehensive judgment combining momentum and trend - Multi-indicator combination: Can combine multiple indicators for more - Calculate EMA indicator and judge if price crosses EMA. - Judge if in trending state based on standard deviation. - Return current trend (long, short, or sideways). - Check if positions meet stop-loss conditions. - If met, execute position closing and record stop-loss information. - Check if positions meet take-profit conditions. - If met, execute position closing and record take-profit information. - Check if positions meet trailing take-profit trigger conditions. - If met, execute position closing and record take-profit information. - Check current position situation. - Execute opening or closing operations based on trend judgment results. - Get market data: Obtain current quotes, position information, account information, and K-line data. - Check positions: Ensure no simultaneous long and short positions; otherwise throw exception. - Strategy interaction: Process commands sent by users through interactive interface. - Status bar information printing: Update and print strategy running status, account information, and position situation. - Stop-loss: Check and execute stop-loss operations based on stop-loss rules. - Take-profit: Check and execute take-profit operations based on take-profit rules. - Trailing take-profit: Check and execute take-profit operations based on trailing take-profit rules. - K-line period check: Ensure strategy logic executes according to K-line period. - Trend judgment: Judge current trend (long, short, or sideways) based on technical indicators. - Order placement: Execute opening or closing operations based on trend judgment results. - Build table data for strategy overview, account funds, and position situation. - Use LogStatus function to output table data to status bar. - Initialize exchange simulation environment. - Create strategy instance and initialize data. - Cyclically execute strategy logic, checking market conditions and executing trading operations at regular intervals.