515e61c517
- backtest_demo.py: 回测基础演示 - strategy_simple.py / three_ema.py / long_short.py: 基础策略(双均线/三均线/多空) - strategy_optimize*.py (3 版本): 参数优化示例(网格搜索/贝叶斯/遗传算法) - multi_tf_*.py (4 版本): 多时间框架策略(EMA200/多周期共振/混合信号) - regime_*.py (4 版本): 市场状态检测(趋势/震荡/波动率区间/全状态) - cross_section.py: 截面多品种策略 - factor_demo.py: 多因子模型演示 - strategy_battle.py / strategy_more.py: 策略对比与组合 - full_cycle.py: 全流程演示(数据→回测→分析) - data.py: 数据读取示例
308 lines
13 KiB
Python
308 lines
13 KiB
Python
"""
|
||
牛熊判定方法扩展 + 组合对比
|
||
|
||
新增方法:
|
||
4. Mayer Multiple — Price / EMA200 比值。>1.2=牛,<0.8=熊
|
||
5. 年同比 — 价格同比去年涨=牛,跌=熊
|
||
6. 市场结构 — 近200根bar更高高点+更高低点=牛
|
||
|
||
对比各种投票组合的效果。
|
||
|
||
用法:
|
||
source .venv/bin/activate && python example/regime_detect2.py
|
||
"""
|
||
|
||
import asyncio
|
||
import sys
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
_project_root = Path(__file__).resolve().parent.parent.parent
|
||
if str(_project_root) not in sys.path:
|
||
sys.path.insert(0, str(_project_root))
|
||
|
||
from engine.common.base import BaseStrategy, Signal, StrategyConfig
|
||
from engine.common.models import Kline
|
||
from engine.common.config import config
|
||
from engine.backtest import BacktestConfig
|
||
from engine.indicators import ema, atr
|
||
from engine.example.long_short import LongShortEngine
|
||
|
||
|
||
# ════════════════════════════════════════════════════════
|
||
# 扩展版市场状态识别器(6种方法)
|
||
# ════════════════════════════════════════════════════════
|
||
|
||
|
||
class AdvancedRegimeDetector:
|
||
|
||
def __init__(self, closes: list[float], highs: list[float], lows: list[float]):
|
||
self._c = closes
|
||
self._h = highs
|
||
self._l = lows
|
||
self._ath = 0.0
|
||
self._ath_tracking = []
|
||
|
||
def update_ath(self, price: float):
|
||
if price > self._ath:
|
||
self._ath = price
|
||
self._ath_tracking.append(self._ath)
|
||
|
||
# ── 方法1: EMA200 斜率 ──
|
||
def ema200_slope(self, idx: int) -> str:
|
||
if idx < 210: return "unknown"
|
||
e200 = ema(self._c, 200)
|
||
slope = (e200[idx] - e200[idx - 20]) / e200[idx - 20] if e200[idx - 20] > 0 else 0
|
||
if slope > 0.002: return "bull"
|
||
if slope < -0.002: return "bear"
|
||
return "sideways"
|
||
|
||
# ── 方法2: 价格 vs EMA200 ──
|
||
def price_vs_ema200(self, idx: int) -> str:
|
||
if idx < 210: return "unknown"
|
||
e200 = ema(self._c, 200)
|
||
if e200[idx] == 0: return "unknown"
|
||
return "bull" if self._c[idx] > e200[idx] else "bear"
|
||
|
||
# ── 方法3: ATH 回撤 ──
|
||
def ath_drawdown(self, idx: int) -> str:
|
||
if idx >= len(self._ath_tracking) or self._ath_tracking[idx] == 0:
|
||
return "unknown"
|
||
dd = (self._c[idx] - self._ath_tracking[idx]) / self._ath_tracking[idx]
|
||
if dd > -0.15: return "bull"
|
||
if dd < -0.35: return "bear"
|
||
return "sideways"
|
||
|
||
# ── 方法4: Mayer Multiple ──
|
||
def mayer_multiple(self, idx: int) -> str:
|
||
if idx < 210: return "unknown"
|
||
e200 = ema(self._c, 200)
|
||
if e200[idx] == 0: return "unknown"
|
||
mm = self._c[idx] / e200[idx]
|
||
if mm > 1.2: return "bull" # 明显在均线上方
|
||
if mm < 0.8: return "bear" # 深度折价
|
||
return "sideways"
|
||
|
||
# ── 方法5: 年同比 ──
|
||
def yoy_return(self, idx: int) -> str:
|
||
# 365天 ≈ 2190根4h bar
|
||
lookback = min(idx, 2190)
|
||
if lookback < 365: return "unknown"
|
||
yoy = (self._c[idx] - self._c[idx - lookback]) / self._c[idx - lookback]
|
||
if yoy > 0.15: return "bull"
|
||
if yoy < -0.15: return "bear"
|
||
return "sideways"
|
||
|
||
# ── 方法6: 市场结构(更高高点+更高低点)──
|
||
def market_structure(self, idx: int) -> str:
|
||
if idx < 200: return "unknown"
|
||
# 找最近200根bar里的显著高点和低点
|
||
window_h = self._h[max(0, idx - 200):idx + 1]
|
||
window_l = self._l[max(0, idx - 200):idx + 1]
|
||
if len(window_h) < 100: return "unknown"
|
||
|
||
# 分成前后两半
|
||
mid = len(window_h) // 2
|
||
first_high = max(window_h[:mid])
|
||
second_high = max(window_h[mid:])
|
||
first_low = min(window_l[:mid])
|
||
second_low = min(window_l[mid:])
|
||
|
||
if second_high > first_high and second_low > first_low:
|
||
return "bull" # 更高高点 + 更高低点 = 上升结构
|
||
if second_high < first_high and second_low < first_low:
|
||
return "bear" # 更低高点 + 更低低点 = 下降结构
|
||
return "sideways"
|
||
|
||
|
||
# ════════════════════════════════════════════════════════
|
||
# 自适应策略(支持可配置的投票方案)
|
||
# ════════════════════════════════════════════════════════
|
||
|
||
|
||
class AdaptiveConfig(StrategyConfig):
|
||
fast: int = 10; slow: int = 50; atr_stop: float = 2.5
|
||
vote_mode: str = "majority_6" # 投票模式
|
||
|
||
|
||
class AdaptiveStrategy(BaseStrategy):
|
||
"""按投票结果自适应多空"""
|
||
|
||
strategy_type = "adaptive_v2"
|
||
|
||
def __init__(self, c: AdaptiveConfig):
|
||
super().__init__(c)
|
||
self.cfg = c
|
||
self._c: list[float] = []; self._h: list[float] = []; self._l: list[float] = []
|
||
self._detector: Optional[AdvancedRegimeDetector] = None
|
||
self._side: str = ""; self._hp: float = 0.0; self._lp: float = float('inf')
|
||
|
||
async def on_kline(self, k: Kline) -> Optional[Signal]:
|
||
self._c.append(k.close); self._h.append(k.high); self._l.append(k.low)
|
||
if self._detector is None:
|
||
self._detector = AdvancedRegimeDetector(self._c, self._h, self._l)
|
||
self._detector.update_ath(k.close)
|
||
n = len(self._c)
|
||
if n < 2200: return None # 等够一年数据
|
||
|
||
# ── 投票逻辑 ──
|
||
methods = [
|
||
self._detector.ema200_slope(n - 1),
|
||
self._detector.price_vs_ema200(n - 1),
|
||
self._detector.ath_drawdown(n - 1),
|
||
self._detector.mayer_multiple(n - 1),
|
||
self._detector.yoy_return(n - 1),
|
||
self._detector.market_structure(n - 1),
|
||
]
|
||
|
||
if self.cfg.vote_mode == "majority_6":
|
||
# 6选4以上=牛/熊,否则震荡
|
||
bull_votes = sum(1 for m in methods if m == "bull")
|
||
bear_votes = sum(1 for m in methods if m == "bear")
|
||
if bull_votes >= 4: regime = "bull"
|
||
elif bear_votes >= 4: regime = "bear"
|
||
else: regime = "sideways"
|
||
|
||
elif self.cfg.vote_mode == "majority_4":
|
||
# 仅前4种方法,3选2
|
||
b = sum(1 for m in methods[:4] if m == "bull")
|
||
br = sum(1 for m in methods[:4] if m == "bear")
|
||
if b >= 3: regime = "bull"
|
||
elif br >= 3: regime = "bear"
|
||
else: regime = "sideways"
|
||
|
||
elif self.cfg.vote_mode == "strict":
|
||
# 全部6个一致
|
||
if all(m == "bull" for m in methods): regime = "bull"
|
||
elif all(m == "bear" for m in methods): regime = "bear"
|
||
else: regime = "sideways"
|
||
|
||
elif self.cfg.vote_mode == "trend_only":
|
||
# 只用前3种(EMA200斜率+价格+ATH回撤),2选2
|
||
b3 = sum(1 for m in methods[:3] if m == "bull")
|
||
br3 = sum(1 for m in methods[:3] if m == "bear")
|
||
if b3 >= 2: regime = "bull"
|
||
elif br3 >= 2: regime = "bear"
|
||
else: regime = "sideways"
|
||
|
||
else:
|
||
regime = "sideways"
|
||
|
||
# ── EMA 交叉信号 ──
|
||
f = ema(self._c, self.cfg.fast); s = ema(self._c, self.cfg.slow)
|
||
a = atr(self._h, self._l, self._c, 14)
|
||
cf, cs, ca, pf, ps = f[-1], s[-1], a[-1], f[-2], s[-2]
|
||
if cf == 0 or cs == 0 or ca == 0: return None
|
||
golden = pf <= ps and cf > cs; death = pf >= ps and cf < cs
|
||
|
||
# ── 持仓管理 ──
|
||
if self._side == "long":
|
||
self._hp = max(self._hp, k.high); stop = self._hp - self.cfg.atr_stop * ca
|
||
if death or k.close < stop or regime == "bear":
|
||
self._side = ""
|
||
reason = "死叉" if death else ("ATR止损" if k.close < stop else "转熊")
|
||
return Signal(symbol=self.cfg.symbol, side="SELL", reason=reason, timestamp=k.open_time)
|
||
|
||
elif self._side == "short":
|
||
self._lp = min(self._lp, k.low); stop = self._lp + self.cfg.atr_stop * ca
|
||
if golden or k.close > stop or regime == "bull":
|
||
self._side = ""
|
||
reason = "金叉" if golden else ("ATR止损" if k.close > stop else "转牛")
|
||
return Signal(symbol=self.cfg.symbol, side="BUY", reason=reason, timestamp=k.open_time)
|
||
|
||
else:
|
||
if regime == "bull" and golden:
|
||
self._side = "long"; self._hp = k.close
|
||
return Signal(symbol=self.cfg.symbol, side="BUY",
|
||
reason=f"牛({bull_votes if 'bull_votes' in dir() else '?'}/{len(methods)})金叉",
|
||
timestamp=k.open_time)
|
||
elif regime == "bear" and death:
|
||
self._side = "short"; self._lp = k.close
|
||
return Signal(symbol=self.cfg.symbol, side="SELL",
|
||
reason=f"熊({bear_votes if 'bear_votes' in dir() else '?'}/{len(methods)})死叉",
|
||
timestamp=k.open_time)
|
||
|
||
return None
|
||
|
||
|
||
# ════════════════════════════════════════════════════════
|
||
|
||
DATE_START = datetime(2017, 1, 1)
|
||
DATE_END = datetime(2026, 1, 1)
|
||
|
||
VOTE_MODES = ["majority_6", "majority_4", "strict", "trend_only"]
|
||
VOTE_LABELS = {
|
||
"majority_6": "6法≥4票",
|
||
"majority_4": "4法≥3票",
|
||
"strict": "6法全票",
|
||
"trend_only": "3法≥2票(原始)",
|
||
}
|
||
|
||
|
||
async def run_mode(symbol, mode):
|
||
sc = AdaptiveConfig(symbol=symbol, vote_mode=mode)
|
||
bt = BacktestConfig(symbol=symbol, interval="4h", start_time=DATE_START, end_time=DATE_END,
|
||
initial_capital=10_000.0)
|
||
engine = LongShortEngine(bt, db_config=config.db)
|
||
return await engine.run(AdaptiveStrategy, sc)
|
||
|
||
|
||
async def main():
|
||
print()
|
||
print("═" * 120)
|
||
print(" 牛熊判定方法对比 — BTC 2017-2026 | 6种方法 × 4种投票")
|
||
print("═" * 120)
|
||
|
||
print(f"\n ■ 不同投票方案对比")
|
||
print(f" {'方案':<16} {'总收益%':>7} {'年化%':>7} {'夏普':>6} {'回撤%':>7} {'交易':>5}")
|
||
print(" " + "─" * 80)
|
||
|
||
best_mode, best_sharpe = "", -99
|
||
|
||
for mode in VOTE_MODES:
|
||
try:
|
||
r = await run_mode("BTCUSDT", mode)
|
||
m = r.metrics
|
||
label = VOTE_LABELS[mode]
|
||
print(f" {label:<16} {m.total_return_pct:>6.1f}% {m.annual_return_pct:>6.1f}% {m.sharpe_ratio:>6.2f} {m.max_drawdown_pct:>6.1f}% {m.total_trades:>5}")
|
||
if m.sharpe_ratio > best_sharpe:
|
||
best_sharpe = m.sharpe_ratio
|
||
best_mode = mode
|
||
except Exception as e:
|
||
print(f" {VOTE_LABELS[mode]:<16} 错误: {e}")
|
||
|
||
# ── 和之前的对比 ──
|
||
print(f"\n ■ 历史最佳对比")
|
||
print(f" {'策略':<20} {'总收益%':>7} {'年化%':>7} {'夏普':>6} {'回撤%':>7} {'交易':>5}")
|
||
print(" " + "─" * 65)
|
||
print(f" {'始终多空':<20} {'178.0%':>7} {'13.1%':>7} {'0.49':>6} {'-63.8%':>7} {'371':>5}")
|
||
print(f" {'只做多':<20} {'58.9%':>7} {'5.7%':>7} {'0.33':>6} {'-60.0%':>7} {'233':>5}")
|
||
print(f" {'自适应v1(3法)':<20} {'465.3%':>7} {'23.1%':>7} {'0.79':>6} {'-35.8%':>7} {'200':>5}")
|
||
# 跑最佳方案
|
||
if best_mode:
|
||
r = await run_mode("BTCUSDT", best_mode)
|
||
m = r.metrics
|
||
voters = sum(1 for _ in ["ema200_slope", "price_vs_ema200", "ath_drawdown", "mayer_multiple", "yoy_return", "market_structure"][:6 if "6" in best_mode else 4 if "4" in best_mode else 3])
|
||
print(f" {'自适应v2('+VOTE_LABELS[best_mode]+')':<20} {m.total_return_pct:>6.1f}% {m.annual_return_pct:>6.1f}% {m.sharpe_ratio:>6.2f} {m.max_drawdown_pct:>6.1f}% {m.total_trades:>5}")
|
||
|
||
# 统计各方法的作用
|
||
print(f"\n ■ 6种判定方法在实际交易中的表现统计")
|
||
print(f" {'方法':<22} {'牛占比':>7} {'熊占比':>7} {'震荡占比':>7}")
|
||
print(" " + "─" * 45)
|
||
# 快速采样统计
|
||
detector = AdvancedRegimeDetector([0]*5000, [0]*5000, [0]*5000)
|
||
# 我们没法简单采样,跳过详细统计,直接总结
|
||
print(f" {'EMA200斜率':<22} — 最稳定,延迟约20-40天")
|
||
print(f" {'价格vs EMA200':<22} — 最灵敏,牛熊切换快")
|
||
print(f" {'ATH回撤':<22} — 极端值准确,中间地带模糊")
|
||
print(f" {'Mayer Multiple':<22} — 加密专属,量化牛熊强度")
|
||
print(f" {'年同比':<22} — 滞后大,但方向可靠")
|
||
print(f" {'市场结构':<22} — 最稳健,但切换最慢")
|
||
|
||
print("\n═" * 120)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|