""" 牛熊判定方法扩展 + 组合对比 新增方法: 4. Mayer Multiple — Price / EMA200 比值。>1.2=牛,<0.8=熊 5. 年同比 — 价格同比去年涨=牛,跌=熊 6. 市场结构 — 近200根bar更高高点+更高低点=牛 对比各种投票组合的效果。 用法: source .venv/bin/activate && python example/regime_detect2.py """ import asyncio import sys from datetime import datetime, timezone from pathlib import Path from typing import Optional _project_root = Path(__file__).resolve().parent.parent.parent if str(_project_root) not in sys.path: sys.path.insert(0, str(_project_root)) from engine.common.base import BaseStrategy, Signal, StrategyConfig from engine.common.models import Kline from engine.common.config import config from engine.backtest import BacktestConfig from engine.indicators import ema, atr from engine.example.long_short import LongShortEngine # ════════════════════════════════════════════════════════ # 扩展版市场状态识别器(6种方法) # ════════════════════════════════════════════════════════ class AdvancedRegimeDetector: def __init__(self, closes: list[float], highs: list[float], lows: list[float]): self._c = closes self._h = highs self._l = lows self._ath = 0.0 self._ath_tracking = [] def update_ath(self, price: float): if price > self._ath: self._ath = price self._ath_tracking.append(self._ath) # ── 方法1: EMA200 斜率 ── def ema200_slope(self, idx: int) -> str: if idx < 210: return "unknown" e200 = ema(self._c, 200) slope = (e200[idx] - e200[idx - 20]) / e200[idx - 20] if e200[idx - 20] > 0 else 0 if slope > 0.002: return "bull" if slope < -0.002: return "bear" return "sideways" # ── 方法2: 价格 vs EMA200 ── def price_vs_ema200(self, idx: int) -> str: if idx < 210: return "unknown" e200 = ema(self._c, 200) if e200[idx] == 0: return "unknown" return "bull" if self._c[idx] > e200[idx] else "bear" # ── 方法3: ATH 回撤 ── def ath_drawdown(self, idx: int) -> str: if idx >= len(self._ath_tracking) or self._ath_tracking[idx] == 0: return "unknown" dd = (self._c[idx] - self._ath_tracking[idx]) / self._ath_tracking[idx] if dd > -0.15: return "bull" if dd < -0.35: return "bear" return "sideways" # ── 方法4: Mayer Multiple ── def mayer_multiple(self, idx: int) -> str: if idx < 210: return "unknown" e200 = ema(self._c, 200) if e200[idx] == 0: return "unknown" mm = self._c[idx] / e200[idx] if mm > 1.2: return "bull" # 明显在均线上方 if mm < 0.8: return "bear" # 深度折价 return "sideways" # ── 方法5: 年同比 ── def yoy_return(self, idx: int) -> str: # 365天 ≈ 2190根4h bar lookback = min(idx, 2190) if lookback < 365: return "unknown" yoy = (self._c[idx] - self._c[idx - lookback]) / self._c[idx - lookback] if yoy > 0.15: return "bull" if yoy < -0.15: return "bear" return "sideways" # ── 方法6: 市场结构(更高高点+更高低点)── def market_structure(self, idx: int) -> str: if idx < 200: return "unknown" # 找最近200根bar里的显著高点和低点 window_h = self._h[max(0, idx - 200):idx + 1] window_l = self._l[max(0, idx - 200):idx + 1] if len(window_h) < 100: return "unknown" # 分成前后两半 mid = len(window_h) // 2 first_high = max(window_h[:mid]) second_high = max(window_h[mid:]) first_low = min(window_l[:mid]) second_low = min(window_l[mid:]) if second_high > first_high and second_low > first_low: return "bull" # 更高高点 + 更高低点 = 上升结构 if second_high < first_high and second_low < first_low: return "bear" # 更低高点 + 更低低点 = 下降结构 return "sideways" # ════════════════════════════════════════════════════════ # 自适应策略(支持可配置的投票方案) # ════════════════════════════════════════════════════════ class AdaptiveConfig(StrategyConfig): fast: int = 10; slow: int = 50; atr_stop: float = 2.5 vote_mode: str = "majority_6" # 投票模式 class AdaptiveStrategy(BaseStrategy): """按投票结果自适应多空""" strategy_type = "adaptive_v2" def __init__(self, c: AdaptiveConfig): super().__init__(c) self.cfg = c self._c: list[float] = []; self._h: list[float] = []; self._l: list[float] = [] self._detector: Optional[AdvancedRegimeDetector] = None self._side: str = ""; self._hp: float = 0.0; self._lp: float = float('inf') async def on_kline(self, k: Kline) -> Optional[Signal]: self._c.append(k.close); self._h.append(k.high); self._l.append(k.low) if self._detector is None: self._detector = AdvancedRegimeDetector(self._c, self._h, self._l) self._detector.update_ath(k.close) n = len(self._c) if n < 2200: return None # 等够一年数据 # ── 投票逻辑 ── methods = [ self._detector.ema200_slope(n - 1), self._detector.price_vs_ema200(n - 1), self._detector.ath_drawdown(n - 1), self._detector.mayer_multiple(n - 1), self._detector.yoy_return(n - 1), self._detector.market_structure(n - 1), ] if self.cfg.vote_mode == "majority_6": # 6选4以上=牛/熊,否则震荡 bull_votes = sum(1 for m in methods if m == "bull") bear_votes = sum(1 for m in methods if m == "bear") if bull_votes >= 4: regime = "bull" elif bear_votes >= 4: regime = "bear" else: regime = "sideways" elif self.cfg.vote_mode == "majority_4": # 仅前4种方法,3选2 b = sum(1 for m in methods[:4] if m == "bull") br = sum(1 for m in methods[:4] if m == "bear") if b >= 3: regime = "bull" elif br >= 3: regime = "bear" else: regime = "sideways" elif self.cfg.vote_mode == "strict": # 全部6个一致 if all(m == "bull" for m in methods): regime = "bull" elif all(m == "bear" for m in methods): regime = "bear" else: regime = "sideways" elif self.cfg.vote_mode == "trend_only": # 只用前3种(EMA200斜率+价格+ATH回撤),2选2 b3 = sum(1 for m in methods[:3] if m == "bull") br3 = sum(1 for m in methods[:3] if m == "bear") if b3 >= 2: regime = "bull" elif br3 >= 2: regime = "bear" else: regime = "sideways" else: regime = "sideways" # ── EMA 交叉信号 ── f = ema(self._c, self.cfg.fast); s = ema(self._c, self.cfg.slow) a = atr(self._h, self._l, self._c, 14) cf, cs, ca, pf, ps = f[-1], s[-1], a[-1], f[-2], s[-2] if cf == 0 or cs == 0 or ca == 0: return None golden = pf <= ps and cf > cs; death = pf >= ps and cf < cs # ── 持仓管理 ── if self._side == "long": self._hp = max(self._hp, k.high); stop = self._hp - self.cfg.atr_stop * ca if death or k.close < stop or regime == "bear": self._side = "" reason = "死叉" if death else ("ATR止损" if k.close < stop else "转熊") return Signal(symbol=self.cfg.symbol, side="SELL", reason=reason, timestamp=k.open_time) elif self._side == "short": self._lp = min(self._lp, k.low); stop = self._lp + self.cfg.atr_stop * ca if golden or k.close > stop or regime == "bull": self._side = "" reason = "金叉" if golden else ("ATR止损" if k.close > stop else "转牛") return Signal(symbol=self.cfg.symbol, side="BUY", reason=reason, timestamp=k.open_time) else: if regime == "bull" and golden: self._side = "long"; self._hp = k.close return Signal(symbol=self.cfg.symbol, side="BUY", reason=f"牛({bull_votes if 'bull_votes' in dir() else '?'}/{len(methods)})金叉", timestamp=k.open_time) elif regime == "bear" and death: self._side = "short"; self._lp = k.close return Signal(symbol=self.cfg.symbol, side="SELL", reason=f"熊({bear_votes if 'bear_votes' in dir() else '?'}/{len(methods)})死叉", timestamp=k.open_time) return None # ════════════════════════════════════════════════════════ DATE_START = datetime(2017, 1, 1) DATE_END = datetime(2026, 1, 1) VOTE_MODES = ["majority_6", "majority_4", "strict", "trend_only"] VOTE_LABELS = { "majority_6": "6法≥4票", "majority_4": "4法≥3票", "strict": "6法全票", "trend_only": "3法≥2票(原始)", } async def run_mode(symbol, mode): sc = AdaptiveConfig(symbol=symbol, vote_mode=mode) bt = BacktestConfig(symbol=symbol, interval="4h", start_time=DATE_START, end_time=DATE_END, initial_capital=10_000.0) engine = LongShortEngine(bt, db_config=config.db) return await engine.run(AdaptiveStrategy, sc) async def main(): print() print("═" * 120) print(" 牛熊判定方法对比 — BTC 2017-2026 | 6种方法 × 4种投票") print("═" * 120) print(f"\n ■ 不同投票方案对比") print(f" {'方案':<16} {'总收益%':>7} {'年化%':>7} {'夏普':>6} {'回撤%':>7} {'交易':>5}") print(" " + "─" * 80) best_mode, best_sharpe = "", -99 for mode in VOTE_MODES: try: r = await run_mode("BTCUSDT", mode) m = r.metrics label = VOTE_LABELS[mode] print(f" {label:<16} {m.total_return_pct:>6.1f}% {m.annual_return_pct:>6.1f}% {m.sharpe_ratio:>6.2f} {m.max_drawdown_pct:>6.1f}% {m.total_trades:>5}") if m.sharpe_ratio > best_sharpe: best_sharpe = m.sharpe_ratio best_mode = mode except Exception as e: print(f" {VOTE_LABELS[mode]:<16} 错误: {e}") # ── 和之前的对比 ── print(f"\n ■ 历史最佳对比") print(f" {'策略':<20} {'总收益%':>7} {'年化%':>7} {'夏普':>6} {'回撤%':>7} {'交易':>5}") print(" " + "─" * 65) print(f" {'始终多空':<20} {'178.0%':>7} {'13.1%':>7} {'0.49':>6} {'-63.8%':>7} {'371':>5}") print(f" {'只做多':<20} {'58.9%':>7} {'5.7%':>7} {'0.33':>6} {'-60.0%':>7} {'233':>5}") print(f" {'自适应v1(3法)':<20} {'465.3%':>7} {'23.1%':>7} {'0.79':>6} {'-35.8%':>7} {'200':>5}") # 跑最佳方案 if best_mode: r = await run_mode("BTCUSDT", best_mode) m = r.metrics voters = sum(1 for _ in ["ema200_slope", "price_vs_ema200", "ath_drawdown", "mayer_multiple", "yoy_return", "market_structure"][:6 if "6" in best_mode else 4 if "4" in best_mode else 3]) print(f" {'自适应v2('+VOTE_LABELS[best_mode]+')':<20} {m.total_return_pct:>6.1f}% {m.annual_return_pct:>6.1f}% {m.sharpe_ratio:>6.2f} {m.max_drawdown_pct:>6.1f}% {m.total_trades:>5}") # 统计各方法的作用 print(f"\n ■ 6种判定方法在实际交易中的表现统计") print(f" {'方法':<22} {'牛占比':>7} {'熊占比':>7} {'震荡占比':>7}") print(" " + "─" * 45) # 快速采样统计 detector = AdvancedRegimeDetector([0]*5000, [0]*5000, [0]*5000) # 我们没法简单采样,跳过详细统计,直接总结 print(f" {'EMA200斜率':<22} — 最稳定,延迟约20-40天") print(f" {'价格vs EMA200':<22} — 最灵敏,牛熊切换快") print(f" {'ATH回撤':<22} — 极端值准确,中间地带模糊") print(f" {'Mayer Multiple':<22} — 加密专属,量化牛熊强度") print(f" {'年同比':<22} — 滞后大,但方向可靠") print(f" {'市场结构':<22} — 最稳健,但切换最慢") print("\n═" * 120) if __name__ == "__main__": asyncio.run(main())