515e61c517
- backtest_demo.py: 回测基础演示 - strategy_simple.py / three_ema.py / long_short.py: 基础策略(双均线/三均线/多空) - strategy_optimize*.py (3 版本): 参数优化示例(网格搜索/贝叶斯/遗传算法) - multi_tf_*.py (4 版本): 多时间框架策略(EMA200/多周期共振/混合信号) - regime_*.py (4 版本): 市场状态检测(趋势/震荡/波动率区间/全状态) - cross_section.py: 截面多品种策略 - factor_demo.py: 多因子模型演示 - strategy_battle.py / strategy_more.py: 策略对比与组合 - full_cycle.py: 全流程演示(数据→回测→分析) - data.py: 数据读取示例
235 lines
9.2 KiB
Python
235 lines
9.2 KiB
Python
"""
|
|
横截面动量 — 选强弃弱 + 趋势/均值回归入场
|
|
|
|
策略:
|
|
1. 每根 4h K 线,计算 4 个币种过去 N 根 K 线的收益率
|
|
2. 按收益率排名,只有前 2 名允许做多
|
|
3. 趋势入场:EMA(10,50) 金叉 + 排名前2 → 买入
|
|
4. 回归入场:RSI < 35 + 排名前2 → 回调买入
|
|
5. 出场:排名跌出前2 或 EMA死叉 或 ATR止损
|
|
|
|
币种:BTC/ETH/BNB/SOL | 4h | 2024-2026
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
_project_root = Path(__file__).resolve().parent.parent.parent
|
|
if str(_project_root) not in sys.path:
|
|
sys.path.insert(0, str(_project_root))
|
|
|
|
from engine.common.base import BaseStrategy, Signal, StrategyConfig
|
|
from engine.common.models import Kline
|
|
from engine.common.config import config
|
|
from engine.backtest import BacktestEngine, BacktestConfig
|
|
from engine.data import DataService
|
|
from engine.indicators import ema, atr, rsi as calc_rsi
|
|
|
|
|
|
ALL_SYMBOLS = ["BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT"]
|
|
|
|
|
|
class CrossSectionConfig(StrategyConfig):
|
|
lookback: int = 20 # 排名回溯周期
|
|
rank_threshold: int = 2 # 只做前N名
|
|
ema_fast: int = 10
|
|
ema_slow: int = 50
|
|
rsi_period: int = 14
|
|
rsi_entry: float = 35.0
|
|
atr_stop: float = 2.5
|
|
data_start: Optional[datetime] = None
|
|
data_end: Optional[datetime] = None
|
|
|
|
|
|
class CrossSectionStrategy(BaseStrategy):
|
|
"""横截面动量 — 只做强势币种"""
|
|
|
|
strategy_type = "cross_section"
|
|
|
|
def __init__(self, c: CrossSectionConfig):
|
|
super().__init__(c)
|
|
self.cfg = c
|
|
# 所有币种的数据 {symbol: [Kline]}
|
|
self._all_klines: dict[str, list[Kline]] = {}
|
|
self._all_closes: dict[str, list[float]] = {}
|
|
# 当前币种的数据
|
|
self._closes: list[float] = []
|
|
self._highs: list[float] = []
|
|
self._lows: list[float] = []
|
|
self._highest: float = 0.0
|
|
self._in_position = False
|
|
|
|
async def on_start(self):
|
|
from engine.common.config import config as app_config
|
|
ds = DataService(app_config.db)
|
|
await ds.connect()
|
|
try:
|
|
for sym in ALL_SYMBOLS:
|
|
klines = await ds.fetch_klines(
|
|
symbol=sym, interval="4h",
|
|
start_time=self.cfg.data_start, end_time=self.cfg.data_end,
|
|
limit=1_000_000,
|
|
)
|
|
self._all_klines[sym] = klines
|
|
self._all_closes[sym] = [k.close for k in klines]
|
|
finally:
|
|
await ds.close()
|
|
await super().on_start()
|
|
|
|
def _get_rank(self, ts: float) -> dict[str, float]:
|
|
"""计算所有币种在指定时间戳的排名收益率,返回 {symbol: return%}"""
|
|
scores = {}
|
|
for sym in ALL_SYMBOLS:
|
|
klines = self._all_klines.get(sym, [])
|
|
if not klines:
|
|
scores[sym] = -999
|
|
continue
|
|
# 找到时间戳 <= ts 的最新K线索引
|
|
idx = len(klines) - 1
|
|
for i in range(len(klines) - 1, -1, -1):
|
|
if klines[i].open_time <= ts:
|
|
idx = i
|
|
break
|
|
# 计算过去 lookback 根K线的收益率
|
|
start_idx = max(0, idx - self.cfg.lookback)
|
|
if start_idx >= idx:
|
|
scores[sym] = 0
|
|
else:
|
|
start_price = self._all_closes[sym][start_idx]
|
|
end_price = self._all_closes[sym][idx]
|
|
scores[sym] = (end_price / start_price - 1) * 100 if start_price > 0 else 0
|
|
return scores
|
|
|
|
def _my_rank(self, ts: float) -> int:
|
|
"""当前币种在全部币种中的排名(1=最强)"""
|
|
scores = self._get_rank(ts)
|
|
my_score = scores.get(self.cfg.symbol, -999)
|
|
# 高于我的分数有几个
|
|
better = sum(1 for s in scores.values() if s > my_score)
|
|
return better + 1
|
|
|
|
async def on_kline(self, k: Kline) -> Optional[Signal]:
|
|
self._closes.append(k.close)
|
|
self._highs.append(k.high)
|
|
self._lows.append(k.low)
|
|
n = len(self._closes)
|
|
if n < self.cfg.ema_slow + 10:
|
|
return None
|
|
|
|
fast = ema(self._closes, self.cfg.ema_fast)
|
|
slow = ema(self._closes, self.cfg.ema_slow)
|
|
atr_vals = atr(self._highs, self._lows, self._closes, 14)
|
|
rsi_vals = calc_rsi(self._closes, self.cfg.rsi_period)
|
|
|
|
cur_f, cur_s = fast[-1], slow[-1]
|
|
prev_f, prev_s = fast[-2], slow[-2]
|
|
cur_atr = atr_vals[-1]
|
|
cur_rsi = rsi_vals[-1]
|
|
if cur_f == 0 or cur_s == 0 or cur_atr == 0 or cur_rsi == 0:
|
|
return None
|
|
|
|
rank = self._my_rank(k.open_time)
|
|
is_top = rank <= self.cfg.rank_threshold
|
|
|
|
golden = prev_f <= prev_s and cur_f > cur_s # 趋势入场
|
|
death = prev_f >= prev_s and cur_f < cur_s # 趋势出场
|
|
oversold = cur_rsi < self.cfg.rsi_entry # 均值回归入场
|
|
|
|
# ── 出场 ──
|
|
if self._in_position:
|
|
self._highest = max(self._highest, k.high)
|
|
stop = self._highest - self.cfg.atr_stop * cur_atr
|
|
|
|
if not is_top:
|
|
self._in_position = False
|
|
return Signal(symbol=self.cfg.symbol, side="SELL",
|
|
reason=f"排名跌出前{self.cfg.rank_threshold}(#{rank})", timestamp=k.open_time)
|
|
if death:
|
|
self._in_position = False
|
|
return Signal(symbol=self.cfg.symbol, side="SELL", reason="EMA死叉", timestamp=k.open_time)
|
|
if k.close < stop:
|
|
self._in_position = False
|
|
return Signal(symbol=self.cfg.symbol, side="SELL", reason="ATR止损", timestamp=k.open_time)
|
|
|
|
# ── 入场 ──
|
|
if not self._in_position and is_top:
|
|
# 趋势信号:金叉
|
|
if golden:
|
|
self._in_position = True
|
|
self._highest = k.close
|
|
return Signal(symbol=self.cfg.symbol, side="BUY",
|
|
reason=f"金叉+#{rank}横截面动量", timestamp=k.open_time)
|
|
# 回归信号:RSI超卖
|
|
if oversold:
|
|
self._in_position = True
|
|
self._highest = k.close
|
|
return Signal(symbol=self.cfg.symbol, side="BUY",
|
|
confidence=0.7, # 回归信号稍降仓位
|
|
reason=f"RSI超卖+#{rank}横截面动量 RSI={cur_rsi:.0f}",
|
|
timestamp=k.open_time)
|
|
|
|
return None
|
|
|
|
|
|
# ═══════════════════════════════════════════════
|
|
|
|
DATE_START = datetime(2024, 1, 1)
|
|
DATE_END = datetime(2026, 1, 1)
|
|
|
|
|
|
async def main():
|
|
print()
|
|
print("═" * 110)
|
|
print(" 横截面动量 — 只做最强 + 趋势/回归双信号 | 4h | 2024-2026")
|
|
print("═" * 110)
|
|
print(f" {'币种':<10} {'收益%':>7} {'夏普':>6} {'回撤%':>7} {'交易':>5} {'胜率%':>6} {'盈亏比':>6}")
|
|
print("─" * 110)
|
|
|
|
results = {}
|
|
for symbol in ALL_SYMBOLS:
|
|
sc = CrossSectionConfig(symbol=symbol, data_start=DATE_START, data_end=DATE_END)
|
|
bt = BacktestConfig(symbol=symbol, interval="4h",
|
|
start_time=DATE_START, end_time=DATE_END, initial_capital=10_000.0)
|
|
engine = BacktestEngine(bt, db_config=config.db)
|
|
r = await engine.run(CrossSectionStrategy, sc)
|
|
m = r.metrics
|
|
results[symbol] = (m, r)
|
|
|
|
# 统计排名分布和信号类型
|
|
trend_signals = sum(1 for t in r.trades if t.side == "BUY" and "金叉" in t.reason)
|
|
meanrev_signals = sum(1 for t in r.trades if t.side == "BUY" and "RSI" in t.reason)
|
|
exits_rank = sum(1 for t in r.trades if t.side == "SELL" and "排名" in t.reason)
|
|
|
|
print(f" {symbol:<10} {m.total_return_pct:>6.1f}% {m.sharpe_ratio:>6.2f} {m.max_drawdown_pct:>6.1f}% {m.total_trades:>5} {m.win_rate*100:>5.1f}% {m.profit_factor:>6.2f}")
|
|
print(f" {'':<10} └ 趋势入场:{trend_signals} 回归入场:{meanrev_signals} 排名出场:{exits_rank}")
|
|
|
|
sells = [t for t in r.trades if t.side == "SELL" and t.pnl is not None]
|
|
for t in sells[-2:]:
|
|
dt = datetime.fromtimestamp(t.timestamp / 1000, tz=timezone.utc).strftime("%m-%d %H:%M")
|
|
print(f" {'':<10} └ {dt} {t.pnl:>+8.2f} {t.reason}")
|
|
|
|
# ── 对比 ──
|
|
print("─" * 110)
|
|
print("\n ■ 对比:纯趋势跟踪 vs 横截面动量")
|
|
TREND = {
|
|
"BTCUSDT": ("EMA v3(10,50)", 39.9, 1.03, 20),
|
|
"ETHUSDT": ("EMA v3(10,75)", 53.6, 1.04, 18),
|
|
"BNBUSDT": ("EMA v1(20,50)", 52.0, 0.71, 41),
|
|
"SOLUSDT": ("EMA v3(30,50)", 73.6, 1.18, 13),
|
|
}
|
|
print(f" {'币种':<10} {'纯趋势':>24} → {'横截面动量':>24}")
|
|
print(f" {'':<10} {'收益% 夏普 交易':>24} → {'收益% 夏普 交易':>24}")
|
|
for sym in ALL_SYMBOLS:
|
|
t_name, t_ret, t_sh, t_tr = TREND[sym]
|
|
m, r = results[sym]
|
|
print(f" {sym:<10} {t_ret:>5.1f}% {t_sh:>5.2f} {t_tr:>4}次 → {m.total_return_pct:>5.1f}% {m.sharpe_ratio:>5.2f} {m.total_trades:>4}次")
|
|
|
|
print("\n═" * 110)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|