""" 横截面动量 — 选强弃弱 + 趋势/均值回归入场 策略: 1. 每根 4h K 线,计算 4 个币种过去 N 根 K 线的收益率 2. 按收益率排名,只有前 2 名允许做多 3. 趋势入场:EMA(10,50) 金叉 + 排名前2 → 买入 4. 回归入场:RSI < 35 + 排名前2 → 回调买入 5. 出场:排名跌出前2 或 EMA死叉 或 ATR止损 币种:BTC/ETH/BNB/SOL | 4h | 2024-2026 """ import asyncio import sys from datetime import datetime, timezone from pathlib import Path from typing import Optional _project_root = Path(__file__).resolve().parent.parent.parent if str(_project_root) not in sys.path: sys.path.insert(0, str(_project_root)) from engine.common.base import BaseStrategy, Signal, StrategyConfig from engine.common.models import Kline from engine.common.config import config from engine.backtest import BacktestEngine, BacktestConfig from engine.data import DataService from engine.indicators import ema, atr, rsi as calc_rsi ALL_SYMBOLS = ["BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT"] class CrossSectionConfig(StrategyConfig): lookback: int = 20 # 排名回溯周期 rank_threshold: int = 2 # 只做前N名 ema_fast: int = 10 ema_slow: int = 50 rsi_period: int = 14 rsi_entry: float = 35.0 atr_stop: float = 2.5 data_start: Optional[datetime] = None data_end: Optional[datetime] = None class CrossSectionStrategy(BaseStrategy): """横截面动量 — 只做强势币种""" strategy_type = "cross_section" def __init__(self, c: CrossSectionConfig): super().__init__(c) self.cfg = c # 所有币种的数据 {symbol: [Kline]} self._all_klines: dict[str, list[Kline]] = {} self._all_closes: dict[str, list[float]] = {} # 当前币种的数据 self._closes: list[float] = [] self._highs: list[float] = [] self._lows: list[float] = [] self._highest: float = 0.0 self._in_position = False async def on_start(self): from engine.common.config import config as app_config ds = DataService(app_config.db) await ds.connect() try: for sym in ALL_SYMBOLS: klines = await ds.fetch_klines( symbol=sym, interval="4h", start_time=self.cfg.data_start, end_time=self.cfg.data_end, limit=1_000_000, ) self._all_klines[sym] = klines self._all_closes[sym] = [k.close for k in klines] finally: await ds.close() await super().on_start() def _get_rank(self, ts: float) -> dict[str, float]: """计算所有币种在指定时间戳的排名收益率,返回 {symbol: return%}""" scores = {} for sym in ALL_SYMBOLS: klines = self._all_klines.get(sym, []) if not klines: scores[sym] = -999 continue # 找到时间戳 <= ts 的最新K线索引 idx = len(klines) - 1 for i in range(len(klines) - 1, -1, -1): if klines[i].open_time <= ts: idx = i break # 计算过去 lookback 根K线的收益率 start_idx = max(0, idx - self.cfg.lookback) if start_idx >= idx: scores[sym] = 0 else: start_price = self._all_closes[sym][start_idx] end_price = self._all_closes[sym][idx] scores[sym] = (end_price / start_price - 1) * 100 if start_price > 0 else 0 return scores def _my_rank(self, ts: float) -> int: """当前币种在全部币种中的排名(1=最强)""" scores = self._get_rank(ts) my_score = scores.get(self.cfg.symbol, -999) # 高于我的分数有几个 better = sum(1 for s in scores.values() if s > my_score) return better + 1 async def on_kline(self, k: Kline) -> Optional[Signal]: self._closes.append(k.close) self._highs.append(k.high) self._lows.append(k.low) n = len(self._closes) if n < self.cfg.ema_slow + 10: return None fast = ema(self._closes, self.cfg.ema_fast) slow = ema(self._closes, self.cfg.ema_slow) atr_vals = atr(self._highs, self._lows, self._closes, 14) rsi_vals = calc_rsi(self._closes, self.cfg.rsi_period) cur_f, cur_s = fast[-1], slow[-1] prev_f, prev_s = fast[-2], slow[-2] cur_atr = atr_vals[-1] cur_rsi = rsi_vals[-1] if cur_f == 0 or cur_s == 0 or cur_atr == 0 or cur_rsi == 0: return None rank = self._my_rank(k.open_time) is_top = rank <= self.cfg.rank_threshold golden = prev_f <= prev_s and cur_f > cur_s # 趋势入场 death = prev_f >= prev_s and cur_f < cur_s # 趋势出场 oversold = cur_rsi < self.cfg.rsi_entry # 均值回归入场 # ── 出场 ── if self._in_position: self._highest = max(self._highest, k.high) stop = self._highest - self.cfg.atr_stop * cur_atr if not is_top: self._in_position = False return Signal(symbol=self.cfg.symbol, side="SELL", reason=f"排名跌出前{self.cfg.rank_threshold}(#{rank})", timestamp=k.open_time) if death: self._in_position = False return Signal(symbol=self.cfg.symbol, side="SELL", reason="EMA死叉", timestamp=k.open_time) if k.close < stop: self._in_position = False return Signal(symbol=self.cfg.symbol, side="SELL", reason="ATR止损", timestamp=k.open_time) # ── 入场 ── if not self._in_position and is_top: # 趋势信号:金叉 if golden: self._in_position = True self._highest = k.close return Signal(symbol=self.cfg.symbol, side="BUY", reason=f"金叉+#{rank}横截面动量", timestamp=k.open_time) # 回归信号:RSI超卖 if oversold: self._in_position = True self._highest = k.close return Signal(symbol=self.cfg.symbol, side="BUY", confidence=0.7, # 回归信号稍降仓位 reason=f"RSI超卖+#{rank}横截面动量 RSI={cur_rsi:.0f}", timestamp=k.open_time) return None # ═══════════════════════════════════════════════ DATE_START = datetime(2024, 1, 1) DATE_END = datetime(2026, 1, 1) async def main(): print() print("═" * 110) print(" 横截面动量 — 只做最强 + 趋势/回归双信号 | 4h | 2024-2026") print("═" * 110) print(f" {'币种':<10} {'收益%':>7} {'夏普':>6} {'回撤%':>7} {'交易':>5} {'胜率%':>6} {'盈亏比':>6}") print("─" * 110) results = {} for symbol in ALL_SYMBOLS: sc = CrossSectionConfig(symbol=symbol, data_start=DATE_START, data_end=DATE_END) bt = BacktestConfig(symbol=symbol, interval="4h", start_time=DATE_START, end_time=DATE_END, initial_capital=10_000.0) engine = BacktestEngine(bt, db_config=config.db) r = await engine.run(CrossSectionStrategy, sc) m = r.metrics results[symbol] = (m, r) # 统计排名分布和信号类型 trend_signals = sum(1 for t in r.trades if t.side == "BUY" and "金叉" in t.reason) meanrev_signals = sum(1 for t in r.trades if t.side == "BUY" and "RSI" in t.reason) exits_rank = sum(1 for t in r.trades if t.side == "SELL" and "排名" in t.reason) print(f" {symbol:<10} {m.total_return_pct:>6.1f}% {m.sharpe_ratio:>6.2f} {m.max_drawdown_pct:>6.1f}% {m.total_trades:>5} {m.win_rate*100:>5.1f}% {m.profit_factor:>6.2f}") print(f" {'':<10} └ 趋势入场:{trend_signals} 回归入场:{meanrev_signals} 排名出场:{exits_rank}") sells = [t for t in r.trades if t.side == "SELL" and t.pnl is not None] for t in sells[-2:]: dt = datetime.fromtimestamp(t.timestamp / 1000, tz=timezone.utc).strftime("%m-%d %H:%M") print(f" {'':<10} └ {dt} {t.pnl:>+8.2f} {t.reason}") # ── 对比 ── print("─" * 110) print("\n ■ 对比:纯趋势跟踪 vs 横截面动量") TREND = { "BTCUSDT": ("EMA v3(10,50)", 39.9, 1.03, 20), "ETHUSDT": ("EMA v3(10,75)", 53.6, 1.04, 18), "BNBUSDT": ("EMA v1(20,50)", 52.0, 0.71, 41), "SOLUSDT": ("EMA v3(30,50)", 73.6, 1.18, 13), } print(f" {'币种':<10} {'纯趋势':>24} → {'横截面动量':>24}") print(f" {'':<10} {'收益% 夏普 交易':>24} → {'收益% 夏普 交易':>24}") for sym in ALL_SYMBOLS: t_name, t_ret, t_sh, t_tr = TREND[sym] m, r = results[sym] print(f" {sym:<10} {t_ret:>5.1f}% {t_sh:>5.2f} {t_tr:>4}次 → {m.total_return_pct:>5.1f}% {m.sharpe_ratio:>5.2f} {m.total_trades:>4}次") print("\n═" * 110) if __name__ == "__main__": asyncio.run(main())