Files
Rekey 515e61c517 feat(engine): 添加策略示例集(18 个 Demo)
- backtest_demo.py: 回测基础演示
- strategy_simple.py / three_ema.py / long_short.py: 基础策略(双均线/三均线/多空)
- strategy_optimize*.py (3 版本): 参数优化示例(网格搜索/贝叶斯/遗传算法)
- multi_tf_*.py (4 版本): 多时间框架策略(EMA200/多周期共振/混合信号)
- regime_*.py (4 版本): 市场状态检测(趋势/震荡/波动率区间/全状态)
- cross_section.py: 截面多品种策略
- factor_demo.py: 多因子模型演示
- strategy_battle.py / strategy_more.py: 策略对比与组合
- full_cycle.py: 全流程演示(数据→回测→分析)
- data.py: 数据读取示例
2026-06-12 10:27:04 +08:00

235 lines
9.2 KiB
Python

"""
横截面动量 — 选强弃弱 + 趋势/均值回归入场
策略:
1. 每根 4h K 线,计算 4 个币种过去 N 根 K 线的收益率
2. 按收益率排名,只有前 2 名允许做多
3. 趋势入场:EMA(10,50) 金叉 + 排名前2 → 买入
4. 回归入场:RSI < 35 + 排名前2 → 回调买入
5. 出场:排名跌出前2 或 EMA死叉 或 ATR止损
币种:BTC/ETH/BNB/SOL | 4h | 2024-2026
"""
import asyncio
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
_project_root = Path(__file__).resolve().parent.parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root))
from engine.common.base import BaseStrategy, Signal, StrategyConfig
from engine.common.models import Kline
from engine.common.config import config
from engine.backtest import BacktestEngine, BacktestConfig
from engine.data import DataService
from engine.indicators import ema, atr, rsi as calc_rsi
ALL_SYMBOLS = ["BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT"]
class CrossSectionConfig(StrategyConfig):
lookback: int = 20 # 排名回溯周期
rank_threshold: int = 2 # 只做前N名
ema_fast: int = 10
ema_slow: int = 50
rsi_period: int = 14
rsi_entry: float = 35.0
atr_stop: float = 2.5
data_start: Optional[datetime] = None
data_end: Optional[datetime] = None
class CrossSectionStrategy(BaseStrategy):
"""横截面动量 — 只做强势币种"""
strategy_type = "cross_section"
def __init__(self, c: CrossSectionConfig):
super().__init__(c)
self.cfg = c
# 所有币种的数据 {symbol: [Kline]}
self._all_klines: dict[str, list[Kline]] = {}
self._all_closes: dict[str, list[float]] = {}
# 当前币种的数据
self._closes: list[float] = []
self._highs: list[float] = []
self._lows: list[float] = []
self._highest: float = 0.0
self._in_position = False
async def on_start(self):
from engine.common.config import config as app_config
ds = DataService(app_config.db)
await ds.connect()
try:
for sym in ALL_SYMBOLS:
klines = await ds.fetch_klines(
symbol=sym, interval="4h",
start_time=self.cfg.data_start, end_time=self.cfg.data_end,
limit=1_000_000,
)
self._all_klines[sym] = klines
self._all_closes[sym] = [k.close for k in klines]
finally:
await ds.close()
await super().on_start()
def _get_rank(self, ts: float) -> dict[str, float]:
"""计算所有币种在指定时间戳的排名收益率,返回 {symbol: return%}"""
scores = {}
for sym in ALL_SYMBOLS:
klines = self._all_klines.get(sym, [])
if not klines:
scores[sym] = -999
continue
# 找到时间戳 <= ts 的最新K线索引
idx = len(klines) - 1
for i in range(len(klines) - 1, -1, -1):
if klines[i].open_time <= ts:
idx = i
break
# 计算过去 lookback 根K线的收益率
start_idx = max(0, idx - self.cfg.lookback)
if start_idx >= idx:
scores[sym] = 0
else:
start_price = self._all_closes[sym][start_idx]
end_price = self._all_closes[sym][idx]
scores[sym] = (end_price / start_price - 1) * 100 if start_price > 0 else 0
return scores
def _my_rank(self, ts: float) -> int:
"""当前币种在全部币种中的排名(1=最强)"""
scores = self._get_rank(ts)
my_score = scores.get(self.cfg.symbol, -999)
# 高于我的分数有几个
better = sum(1 for s in scores.values() if s > my_score)
return better + 1
async def on_kline(self, k: Kline) -> Optional[Signal]:
self._closes.append(k.close)
self._highs.append(k.high)
self._lows.append(k.low)
n = len(self._closes)
if n < self.cfg.ema_slow + 10:
return None
fast = ema(self._closes, self.cfg.ema_fast)
slow = ema(self._closes, self.cfg.ema_slow)
atr_vals = atr(self._highs, self._lows, self._closes, 14)
rsi_vals = calc_rsi(self._closes, self.cfg.rsi_period)
cur_f, cur_s = fast[-1], slow[-1]
prev_f, prev_s = fast[-2], slow[-2]
cur_atr = atr_vals[-1]
cur_rsi = rsi_vals[-1]
if cur_f == 0 or cur_s == 0 or cur_atr == 0 or cur_rsi == 0:
return None
rank = self._my_rank(k.open_time)
is_top = rank <= self.cfg.rank_threshold
golden = prev_f <= prev_s and cur_f > cur_s # 趋势入场
death = prev_f >= prev_s and cur_f < cur_s # 趋势出场
oversold = cur_rsi < self.cfg.rsi_entry # 均值回归入场
# ── 出场 ──
if self._in_position:
self._highest = max(self._highest, k.high)
stop = self._highest - self.cfg.atr_stop * cur_atr
if not is_top:
self._in_position = False
return Signal(symbol=self.cfg.symbol, side="SELL",
reason=f"排名跌出前{self.cfg.rank_threshold}(#{rank})", timestamp=k.open_time)
if death:
self._in_position = False
return Signal(symbol=self.cfg.symbol, side="SELL", reason="EMA死叉", timestamp=k.open_time)
if k.close < stop:
self._in_position = False
return Signal(symbol=self.cfg.symbol, side="SELL", reason="ATR止损", timestamp=k.open_time)
# ── 入场 ──
if not self._in_position and is_top:
# 趋势信号:金叉
if golden:
self._in_position = True
self._highest = k.close
return Signal(symbol=self.cfg.symbol, side="BUY",
reason=f"金叉+#{rank}横截面动量", timestamp=k.open_time)
# 回归信号:RSI超卖
if oversold:
self._in_position = True
self._highest = k.close
return Signal(symbol=self.cfg.symbol, side="BUY",
confidence=0.7, # 回归信号稍降仓位
reason=f"RSI超卖+#{rank}横截面动量 RSI={cur_rsi:.0f}",
timestamp=k.open_time)
return None
# ═══════════════════════════════════════════════
DATE_START = datetime(2024, 1, 1)
DATE_END = datetime(2026, 1, 1)
async def main():
print()
print("" * 110)
print(" 横截面动量 — 只做最强 + 趋势/回归双信号 | 4h | 2024-2026")
print("" * 110)
print(f" {'币种':<10} {'收益%':>7} {'夏普':>6} {'回撤%':>7} {'交易':>5} {'胜率%':>6} {'盈亏比':>6}")
print("" * 110)
results = {}
for symbol in ALL_SYMBOLS:
sc = CrossSectionConfig(symbol=symbol, data_start=DATE_START, data_end=DATE_END)
bt = BacktestConfig(symbol=symbol, interval="4h",
start_time=DATE_START, end_time=DATE_END, initial_capital=10_000.0)
engine = BacktestEngine(bt, db_config=config.db)
r = await engine.run(CrossSectionStrategy, sc)
m = r.metrics
results[symbol] = (m, r)
# 统计排名分布和信号类型
trend_signals = sum(1 for t in r.trades if t.side == "BUY" and "金叉" in t.reason)
meanrev_signals = sum(1 for t in r.trades if t.side == "BUY" and "RSI" in t.reason)
exits_rank = sum(1 for t in r.trades if t.side == "SELL" and "排名" in t.reason)
print(f" {symbol:<10} {m.total_return_pct:>6.1f}% {m.sharpe_ratio:>6.2f} {m.max_drawdown_pct:>6.1f}% {m.total_trades:>5} {m.win_rate*100:>5.1f}% {m.profit_factor:>6.2f}")
print(f" {'':<10} └ 趋势入场:{trend_signals} 回归入场:{meanrev_signals} 排名出场:{exits_rank}")
sells = [t for t in r.trades if t.side == "SELL" and t.pnl is not None]
for t in sells[-2:]:
dt = datetime.fromtimestamp(t.timestamp / 1000, tz=timezone.utc).strftime("%m-%d %H:%M")
print(f" {'':<10}{dt} {t.pnl:>+8.2f} {t.reason}")
# ── 对比 ──
print("" * 110)
print("\n ■ 对比:纯趋势跟踪 vs 横截面动量")
TREND = {
"BTCUSDT": ("EMA v3(10,50)", 39.9, 1.03, 20),
"ETHUSDT": ("EMA v3(10,75)", 53.6, 1.04, 18),
"BNBUSDT": ("EMA v1(20,50)", 52.0, 0.71, 41),
"SOLUSDT": ("EMA v3(30,50)", 73.6, 1.18, 13),
}
print(f" {'币种':<10} {'纯趋势':>24}{'横截面动量':>24}")
print(f" {'':<10} {'收益% 夏普 交易':>24}{'收益% 夏普 交易':>24}")
for sym in ALL_SYMBOLS:
t_name, t_ret, t_sh, t_tr = TREND[sym]
m, r = results[sym]
print(f" {sym:<10} {t_ret:>5.1f}% {t_sh:>5.2f} {t_tr:>4}次 → {m.total_return_pct:>5.1f}% {m.sharpe_ratio:>5.2f} {m.total_trades:>4}")
print("\n" * 110)
if __name__ == "__main__":
asyncio.run(main())