Files
Rekey edc50e8809 feat: 新增2h/6h时间框架支持,策略重构为增量指标计算
- 数据层: build_aggregates_sql 新增 2h/6h 聚合视图,默认起始时间调整为 2017-05
- 模型层: KlineInterval 类型扩展 2h/6h,DataService 新增对应表名和毫秒映射
- 指标层: 新增 incremental.py 增量指标模块 (EmaInc/AtrInc/RsiInc/BbInc),O(1) per bar
- 策略重构: long_short.py 和 regime_all.py 从批量 ema/atr 迁移至增量指标,避免每 bar 重复全量计算
- regime 探测器: RegimeDetector3 改为增量 EMA200,detect() 接口简化
- 回测扩展: regime_timeframe_comparison 从 4h/1d 扩展至 2h/4h/6h/1d
- 新增示例: multi_strategy_report, vol_break_compare/periods, intraday_explore, top3_trades 等分析脚本
2026-06-13 19:30:25 +08:00

509 lines
22 KiB
Python

"""
多空双向回测 — EMA 趋势跟踪(支持做空)
基于表现最好的纯趋势参数,增加做空能力:
- 金叉 → 平空仓 + 做多
- 死叉 → 平多仓 + 做空
- ATR 动态止损(多空双向)
- 始终持仓(非多即空)
- 输出增加年化收益
参数(各币种历史最优):
BTC(10,50) ETH(10,75) BNB(20,50) SOL(30,50)
用法:
source .venv/bin/activate && python example/long_short.py
"""
import asyncio
import statistics
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Type
_project_root = Path(__file__).resolve().parent.parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root))
from engine.common.base import BaseStrategy, Signal, StrategyConfig
from engine.common.models import Kline
from engine.common.config import config, DBConfig
from engine.data import DataService
from engine.indicators.incremental import EmaInc, AtrInc
from engine.backtest.models import BacktestConfig, BacktestMetrics, BacktestResult, BacktestTrade
# ════════════════════════════════════════════════════════
# 多空回测引擎
# ════════════════════════════════════════════════════════
class LongShortEngine:
"""支持多空双向的事件驱动回测引擎"""
def __init__(self, bt_config: BacktestConfig, db_config=None):
self.config = bt_config
self._db_config = db_config
self._cash: float = bt_config.initial_capital
self._position: float = 0.0 # >0 多头, <0 空头, =0 空仓
self._avg_entry_price: float = 0.0
self._trades: list[BacktestTrade] = []
self._equity: list[dict] = []
self._peak_equity: float = 0.0
self._pending_buy: Optional[Signal] = None
self._pending_sell: Optional[Signal] = None
async def run(self, strategy_cls, strategy_config: StrategyConfig) -> BacktestResult:
from engine.common.config import config as app_config
strategy_config.symbol = self.config.symbol
strategy_config.exchange = self.config.exchange
db_cfg = self._db_config or app_config.db
ds = DataService(db_cfg)
await ds.connect()
try:
klines = await ds.fetch_klines(
symbol=self.config.symbol, interval=self.config.interval,
start_time=self.config.start_time, end_time=self.config.end_time,
limit=1_000_000,
)
if len(klines) < self.config.warmup_bars + 2:
raise ValueError(f"数据不足:需 {self.config.warmup_bars+2},实际 {len(klines)}")
strategy = strategy_cls(strategy_config)
await strategy.on_start()
self._cash = self.config.initial_capital
self._position = 0.0
self._avg_entry_price = 0.0
self._trades = []
self._equity = []
self._pending_buy = None
self._pending_sell = None
warmup_end = self.config.warmup_bars
for i in range(warmup_end):
await strategy.on_kline(klines[i])
for i in range(warmup_end, len(klines)):
kline = klines[i]
# 先执行待执行订单(下一根 bar 开盘价)
if self._pending_buy is not None:
self._execute_buy(self._pending_buy, kline)
self._pending_buy = None
if self._pending_sell is not None:
self._execute_sell(self._pending_sell, kline)
self._pending_sell = None
signal = await strategy.on_kline(kline)
if signal is not None and signal.side == "BUY":
self._pending_buy = signal
elif signal is not None and signal.side == "SELL":
self._pending_sell = signal
self._record_equity(kline)
# 强平
if self._position != 0 and len(klines) > 0:
last_k = klines[-1]
if self._position > 0:
self._execute_sell(Signal(symbol=self.config.symbol, side="SELL",
quantity=abs(self._position),
reason="回测结束—强平多仓", timestamp=last_k.open_time), last_k)
else:
self._execute_buy(Signal(symbol=self.config.symbol, side="BUY",
quantity=abs(self._position),
reason="回测结束—强平空仓", timestamp=last_k.open_time), last_k)
await strategy.on_stop()
metrics = self._compute_metrics()
return BacktestResult(config=self.config, strategy_config=strategy_config.model_dump(),
metrics=metrics, trades=self._trades, equity_curve=self._equity)
finally:
await ds.close()
# ── 交易执行 ──
def _execute_buy(self, signal: Signal, kline: Kline) -> None:
exec_price = kline.open * (1 + self.config.slippage_pct)
qty = signal.quantity
if qty is None:
if self._position < 0:
qty = abs(self._position) # 平空仓
else:
max_notional = self._cash * signal.confidence
qty = max_notional / exec_price
qty = self._round_qty(qty)
if qty < self.config.min_order_qty:
return
notional = exec_price * qty
commission = notional * self.config.commission_pct
if self._position < 0:
# 平空仓
cover_qty = min(qty, abs(self._position))
cover_notional = exec_price * cover_qty
cover_comm = cover_notional * self.config.commission_pct
pnl = (self._avg_entry_price - exec_price) * cover_qty - cover_comm
self._cash -= cover_notional + cover_comm
self._position += cover_qty
if abs(self._position) < self.config.min_order_qty:
self._position = 0.0
self._avg_entry_price = 0.0
self._trades.append(BacktestTrade(timestamp=kline.open_time, symbol=self.config.symbol,
side="BUY", price=exec_price, quantity=cover_qty,
notional=cover_notional, commission=cover_comm,
slippage=exec_price - kline.open, pnl=pnl,
reason=signal.reason))
# 剩余开多
remaining = qty - cover_qty
if remaining >= self.config.min_order_qty:
self._open_long(remaining, exec_price, kline, signal)
else:
# 开多 / 加仓
self._open_long(qty, exec_price, kline, signal)
def _open_long(self, qty: float, exec_price: float, kline: Kline, signal: Signal):
notional = exec_price * qty
commission = notional * self.config.commission_pct
total_cost = notional + commission
if total_cost > self._cash:
qty = self._round_qty(self._cash / (exec_price * (1 + self.config.commission_pct)))
if qty < self.config.min_order_qty:
return
notional = exec_price * qty
commission = notional * self.config.commission_pct
total_cost = notional + commission
if self._position > 0:
total_value = self._avg_entry_price * self._position + notional
self._position += qty
self._avg_entry_price = total_value / self._position
else:
self._position = qty
self._avg_entry_price = exec_price
self._cash -= total_cost
self._trades.append(BacktestTrade(timestamp=kline.open_time, symbol=self.config.symbol,
side="BUY", price=exec_price, quantity=qty,
notional=notional, commission=commission,
slippage=exec_price - kline.open, reason=signal.reason))
def _execute_sell(self, signal: Signal, kline: Kline) -> None:
exec_price = kline.close * (1 - self.config.slippage_pct)
qty = signal.quantity
if qty is None:
if self._position > 0:
qty = self._position
else:
max_notional = self._cash * signal.confidence
qty = max_notional / exec_price
qty = self._round_qty(qty)
if qty < self.config.min_order_qty:
return
notional = exec_price * qty
commission = notional * self.config.commission_pct
if self._position > 0:
# 平多仓
close_qty = min(qty, self._position)
close_notional = exec_price * close_qty
close_comm = close_notional * self.config.commission_pct
pnl = (exec_price - self._avg_entry_price) * close_qty - close_comm
self._position -= close_qty
self._cash += close_notional - close_comm
if self._position < self.config.min_order_qty:
self._position = 0.0
self._avg_entry_price = 0.0
self._trades.append(BacktestTrade(timestamp=kline.open_time, symbol=self.config.symbol,
side="SELL", price=exec_price, quantity=close_qty,
notional=close_notional, commission=close_comm,
slippage=kline.close - exec_price, pnl=pnl,
reason=signal.reason))
# 剩余开空
remaining = qty - close_qty
if remaining >= self.config.min_order_qty:
self._open_short(remaining, exec_price, kline, signal)
else:
self._open_short(qty, exec_price, kline, signal)
def _open_short(self, qty: float, exec_price: float, kline: Kline, signal: Signal):
notional = exec_price * qty
commission = notional * self.config.commission_pct
if self._position < 0:
total_value = self._avg_entry_price * abs(self._position) + notional
self._position -= qty
self._avg_entry_price = total_value / abs(self._position)
else:
self._position = -qty
self._avg_entry_price = exec_price
self._cash += notional - commission
self._trades.append(BacktestTrade(timestamp=kline.open_time, symbol=self.config.symbol,
side="SELL", price=exec_price, quantity=qty,
notional=notional, commission=commission,
slippage=kline.close - exec_price, reason=signal.reason))
# ── 资金曲线 ──
def _record_equity(self, kline: Kline) -> None:
equity = self._cash + self._position * kline.close
if not self._equity:
self._peak_equity = equity
elif equity > self._peak_equity:
self._peak_equity = equity
dd = (equity - self._peak_equity) / self._peak_equity * 100 if self._peak_equity > 0 else 0.0
self._equity.append({"timestamp": kline.open_time, "equity": equity,
"drawdown": dd, "position": self._position})
# ── 绩效 ──
def _compute_metrics(self) -> BacktestMetrics:
if not self._equity:
return BacktestMetrics()
initial = self.config.initial_capital
final = self._equity[-1]["equity"]
total_return_pct = (final - initial) / initial * 100
first_ts = self._equity[0]["timestamp"]
last_ts = self._equity[-1]["timestamp"]
days = (last_ts - first_ts) / (1000 * 86400)
if days > 0 and final > 0 and initial > 0:
annual_return_pct = ((final / initial) ** (365 / days) - 1) * 100
else:
annual_return_pct = 0.0
daily_returns = self._compute_daily_returns()
if len(daily_returns) > 1:
mean_ret = statistics.mean(daily_returns)
std_ret = statistics.stdev(daily_returns)
sharpe_ratio = (mean_ret / std_ret * (365 ** 0.5)) if std_ret > 0 else 0.0
else:
sharpe_ratio = 0.0
max_dd_pct, max_dd_days = self._compute_max_drawdown()
closed = [t for t in self._trades if t.pnl is not None]
total_trades = len(closed)
if total_trades > 0:
winners = [t for t in closed if t.pnl > 0]
losers = [t for t in closed if t.pnl <= 0]
win_rate = len(winners) / total_trades
gp = sum(t.pnl for t in winners)
gl = abs(sum(t.pnl for t in losers))
profit_factor = gp / gl if gl > 0 else (gp if gp > 0 else 0.0)
avg_pnl = sum(t.pnl for t in closed) / total_trades
best_pnl = max(t.pnl for t in closed)
worst_pnl = min(t.pnl for t in closed)
else:
win_rate = profit_factor = avg_pnl = best_pnl = worst_pnl = 0.0
calmar = annual_return_pct / abs(max_dd_pct) if max_dd_pct < 0 else 0.0
return BacktestMetrics(
total_return_pct=total_return_pct, annual_return_pct=annual_return_pct,
sharpe_ratio=sharpe_ratio, max_drawdown_pct=max_dd_pct,
max_drawdown_duration_days=max_dd_days, win_rate=win_rate,
profit_factor=profit_factor, total_trades=total_trades,
avg_trade_pnl=avg_pnl, best_trade_pnl=best_pnl, worst_trade_pnl=worst_pnl,
calmar_ratio=calmar, final_equity=final,
)
def _compute_daily_returns(self) -> list[float]:
if not self._equity:
return []
daily: dict[str, float] = {}
for point in self._equity:
dt = datetime.fromtimestamp(point["timestamp"] / 1000, tz=timezone.utc)
daily[dt.strftime("%Y-%m-%d")] = point["equity"]
sorted_dates = sorted(daily.keys())
returns = []
for i in range(1, len(sorted_dates)):
prev = daily[sorted_dates[i - 1]]
curr = daily[sorted_dates[i]]
if prev > 0:
returns.append((curr - prev) / prev)
return returns
def _compute_max_drawdown(self) -> tuple[float, int]:
if not self._equity:
return 0.0, 0
peak = self._equity[0]["equity"]
max_dd = 0.0
dd_start_idx = 0
max_dd_days = 0
for i, point in enumerate(self._equity):
equity = point["equity"]
if equity > peak:
peak = equity
dd_start_idx = i
dd = (equity - peak) / peak * 100
if dd < max_dd:
max_dd = dd
peak_ts = self._equity[dd_start_idx]["timestamp"]
dd_days = int((point["timestamp"] - peak_ts) / (1000 * 86400))
if dd_days > max_dd_days:
max_dd_days = dd_days
return max_dd, max_dd_days
@staticmethod
def _round_qty(qty: float, decimals: int = 8) -> float:
factor = 10 ** decimals
return int(qty * factor) / factor
# ════════════════════════════════════════════════════════
# 多空趋势策略
# ════════════════════════════════════════════════════════
class LongShortEmaConfig(StrategyConfig):
fast: int = 10
slow: int = 50
atr_stop: float = 2.5
class LongShortEmaStrategy(BaseStrategy):
"""EMA金叉做多、死叉做空,始终在场 — 全部指标增量计算"""
strategy_type = "long_short_ema"
def __init__(self, c: LongShortEmaConfig):
super().__init__(c)
self.cfg = c
self._closes: list[float] = []
self._highs: list[float] = []
self._lows: list[float] = []
self._ema_fast = EmaInc(c.fast)
self._ema_slow = EmaInc(c.slow)
self._atr = AtrInc(14)
self._highest: float = 0.0
self._lowest: float = float('inf')
self._position_side: str = "" # "long" / "short"
async def on_kline(self, k: Kline) -> Optional[Signal]:
self._closes.append(k.close)
self._highs.append(k.high)
self._lows.append(k.low)
# 增量更新(即使在热身期也要更新,保证后续状态正确)
self._ema_fast.update(k.close)
self._ema_slow.update(k.close)
self._atr.update(k.high, k.low, k.close)
n = len(self._closes)
if n < self.cfg.slow + 5:
return None
cur_f, cur_s = self._ema_fast[-1], self._ema_slow[-1]
cur_atr = self._atr[-1]
prev_f, prev_s = self._ema_fast[-2], self._ema_slow[-2]
if cur_f == 0 or cur_s == 0 or cur_atr == 0:
return None
golden = prev_f <= prev_s and cur_f > cur_s
death = prev_f >= prev_s and cur_f < cur_s
# ── 多头持仓 ──
if self._position_side == "long":
self._highest = max(self._highest, k.high)
stop = self._highest - self.cfg.atr_stop * cur_atr
if death:
self._position_side = "short"
return Signal(symbol=self.cfg.symbol, side="SELL", reason="EMA死叉→做空", timestamp=k.open_time)
if k.close < stop:
self._position_side = ""
return Signal(symbol=self.cfg.symbol, side="SELL", reason="ATR止损→空仓", timestamp=k.open_time)
# ── 空头持仓 ──
elif self._position_side == "short":
self._lowest = min(self._lowest, k.low)
stop = self._lowest + self.cfg.atr_stop * cur_atr
if golden:
self._position_side = "long"
return Signal(symbol=self.cfg.symbol, side="BUY", reason="EMA金叉→做多", timestamp=k.open_time)
if k.close > stop:
self._position_side = ""
return Signal(symbol=self.cfg.symbol, side="BUY", reason="ATR止损→空仓", timestamp=k.open_time)
# ── 空仓等待信号 ──
else:
if golden:
self._position_side = "long"
self._highest = k.close
return Signal(symbol=self.cfg.symbol, side="BUY", reason="金叉→做多", timestamp=k.open_time)
elif death:
self._position_side = "short"
self._lowest = k.close
return Signal(symbol=self.cfg.symbol, side="SELL", reason="死叉→做空", timestamp=k.open_time)
return None
# ════════════════════════════════════════════════════════
SYMBOLS = ["BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT"]
# 各币种历史最优参数
PARAMS = {
"BTCUSDT": (10, 50),
"ETHUSDT": (10, 75),
"BNBUSDT": (20, 50),
"SOLUSDT": (30, 50),
}
# 只做多结果(用于对比)
LONG_ONLY = {
"BTCUSDT": (39.9, 1.03, -11.5, 18.3),
"ETHUSDT": (53.6, 1.04, -15.3, 23.9),
"BNBUSDT": (52.0, 0.71, -39.8, 23.3),
"SOLUSDT": (73.6, 1.18, -25.7, 31.7),
}
# (总收益%, 夏普, 回撤%, 年化%)
DATE_START = datetime(2024, 1, 1)
DATE_END = datetime(2026, 1, 1)
async def main():
print()
print("" * 112)
print(" 多空双向 EMA 趋势跟踪 | 4h | 2024-2026")
print("" * 112)
header = f" {'币种':<10} {'方向':<6} {'总收益%':>7} {'年化%':>7} {'夏普':>6} {'回撤%':>7} {'交易':>5} {'胜率%':>6} {'盈亏比':>6}"
print(header)
print("" * 112)
for symbol in SYMBOLS:
fast, slow = PARAMS[symbol]
sc = LongShortEmaConfig(symbol=symbol, fast=fast, slow=slow)
bt = BacktestConfig(symbol=symbol, interval="4h",
start_time=DATE_START, end_time=DATE_END, initial_capital=10_000.0)
engine = LongShortEngine(bt, db_config=config.db)
r = await engine.run(LongShortEmaStrategy, sc)
m = r.metrics
long_trades = [t for t in r.trades if t.pnl is not None and t.side == "SELL"]
short_trades = [t for t in r.trades if t.pnl is not None and t.side == "BUY"]
lo = LONG_ONLY[symbol]
long_pnl = sum(t.pnl for t in long_trades) if long_trades else 0
short_pnl = sum(t.pnl for t in short_trades) if short_trades else 0
print(f" {symbol:<10} 多空 {m.total_return_pct:>6.1f}% {m.annual_return_pct:>6.1f}% {m.sharpe_ratio:>6.2f} {m.max_drawdown_pct:>6.1f}% {m.total_trades:>5} {m.win_rate*100:>5.1f}% {m.profit_factor:>6.2f}")
print(f" {'':<10} 只做多 {lo[0]:>6.1f}% {lo[3]:>6.1f}% {lo[1]:>6.2f} {lo[2]:>6.1f}%")
if long_trades or short_trades:
print(f" {'':<10} └ 多头P&L {long_pnl:>+7.0f} ({len(long_trades)}笔) 空头P&L {short_pnl:>+7.0f} ({len(short_trades)}笔)")
for t in (r.trades[-2:] if r.trades else []):
if t.pnl is not None:
side_label = "平多" if t.side == "SELL" else "平空"
dt = datetime.fromtimestamp(t.timestamp / 1000, tz=timezone.utc).strftime("%m-%d %H:%M")
print(f" {'':<10}{dt} {side_label} {t.pnl:>+8.2f} {t.reason}")
print("" * 112)
print("\n" * 112)
if __name__ == "__main__":
asyncio.run(main())