feat: 全链路新增 type 字段支持 + exchange.ts 超时退出优化

- TS: exit 函数统一管理进程退出与 DB 连接关闭;10s 超时 + 异常路径 clearTimeout
- Python: PairType(spot/um/cm) 贯穿 Kline 模型、策略配置、数据查询
- 回测脚本升级: 9策略 × 4币种 × 6时间级别 × 2交易类型
- 新增 generate_report.py 回测报告生成工具
This commit is contained in:
Rekey
2026-06-17 10:01:52 +08:00
parent ebaef5042e
commit 626acb20d3
13 changed files with 42394 additions and 5129 deletions
+67 -59
View File
@@ -1,8 +1,8 @@
"""
2h/6h 策略全维度对比回测 — 9策略 × 4币种 × 2时间级别 × 4数据量
1h-1d 策略全维度对比回测 — 9策略 × 4币种 × 6时间级别 × 2交易类型 × 4数据量
8个网络知名策略 + 牛熊自适应策略(RegimeDetector3投票)
2h 和 6h 两个新时间级别上的表现对比。
1h/2h/4h/6h/8h/1d 六个时间级别上的表现对比。
用法:
source .venv/bin/activate && python example/comparison_2h_6h.py
@@ -31,7 +31,8 @@ from engine.example.long_short import LongShortEngine
# ── 全局常量 ──
SYMBOLS = ["BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT"]
TIMEFRAMES = ["2h", "6h"]
TIMEFRAMES = ["1h", "2h", "4h", "6h", "8h", "1d"]
PAIR_TYPES = ["spot", "um"]
INITIAL = 10_000.0
WARMUP = 150
MAX_CONCURRENCY = 6
@@ -820,12 +821,13 @@ STRATEGY_PARAMS_STR = {
# 执行
# ════════════════════════════════════════════════════════
async def run_one(entry, symbol, interval, period_label, start, end):
async def run_one(entry, symbol, interval, pair_type, period_label, start, end):
make_config = entry["make_config"]
strategy_cls = entry["strategy_cls"]
sc = make_config(symbol)
bt = BacktestConfig(
symbol=symbol, interval=interval,
type=pair_type,
start_time=start, end_time=end,
initial_capital=INITIAL, warmup_bars=WARMUP,
)
@@ -852,14 +854,15 @@ async def main():
date_ranges: dict[tuple[str, str], tuple] = {}
for symbol in SYMBOLS:
for tf in TIMEFRAMES:
try:
s, e = await ds.fetch_symbol_date_range(symbol, tf)
bar_ms = {"2h": 7_200_000, "6h": 21_600_000}
estimated_bars = int((e - s).total_seconds() * 1000 / bar_ms[tf])
date_ranges[(symbol, tf)] = (s, e, estimated_bars)
print(f" {symbol} {tf:<4}: {s.date()} ~ {e.date()} (约{estimated_bars:,}根)")
except Exception as ex:
print(f" {symbol} {tf:<4}: 获取失败 — {ex}")
for pair_type in PAIR_TYPES:
try:
s, e = await ds.fetch_symbol_date_range(symbol, tf, type=pair_type)
bar_ms = {"1h": 3_600_000, "2h": 7_200_000, "4h": 14_400_000, "6h": 21_600_000, "8h": 28_800_000, "1d": 86_400_000}
estimated_bars = int((e - s).total_seconds() * 1000 / bar_ms[tf])
date_ranges[(symbol, tf, pair_type)] = (s, e, estimated_bars)
print(f" {symbol} {tf:<4} {pair_type:<5}: {s.date()} ~ {e.date()} (约{estimated_bars:,}根)")
except Exception as ex:
print(f" {symbol} {tf:<4} {pair_type:<5}: 获取失败 — {ex}")
await ds.close()
@@ -869,39 +872,41 @@ async def main():
for strat_name, entry in STRATEGY_REGISTRY.items():
for symbol in SYMBOLS:
for tf in TIMEFRAMES:
key = (symbol, tf)
if key not in date_ranges:
continue
fs, fe, est_bars = date_ranges[key]
for period_label, (period_start, period_end) in PERIODS.items():
actual_start = period_start or fs
actual_end = period_end or fe
if actual_start >= actual_end:
for pair_type in PAIR_TYPES:
key = (symbol, tf, pair_type)
if key not in date_ranges:
continue
fs, fe, est_bars = date_ranges[key]
min_bars = MIN_BARS_FOR_PERIOD.get(period_label, 50)
actual_bars = est_bars
if period_label != "全量":
actual_bars = int((actual_end - actual_start).total_seconds() * 1000 / {
"2h": 7_200_000, "6h": 21_600_000
}[tf])
for period_label, (period_start, period_end) in PERIODS.items():
actual_start = period_start or fs
actual_end = period_end or fe
if actual_start >= actual_end:
continue
if actual_bars < min_bars:
continue
min_bars = MIN_BARS_FOR_PERIOD.get(period_label, 50)
actual_bars = est_bars
if period_label != "全量":
actual_bars = int((actual_end - actual_start).total_seconds() * 1000 / {
"1h": 3_600_000, "2h": 7_200_000, "4h": 14_400_000, "6h": 21_600_000, "8h": 28_800_000, "1d": 86_400_000
}[tf])
tasks_info.append({
"strat_name": strat_name,
"entry": entry,
"symbol": symbol,
"tf": tf,
"period_label": period_label,
"start": actual_start,
"end": actual_end,
})
if actual_bars < min_bars:
continue
tasks_info.append({
"strat_name": strat_name,
"entry": entry,
"symbol": symbol,
"tf": tf,
"pair_type": pair_type,
"period_label": period_label,
"start": actual_start,
"end": actual_end,
})
total = len(tasks_info)
print(f"\n{total} 组回测任务 (9策略×4币种×2时间×4数据量 - 跳过数据不足)")
print(f"\n{total} 组回测任务 (9策略×4币种×6时间×2类型×4数据量 - 跳过数据不足)")
results: list[dict] = []
completed = 0
@@ -911,7 +916,7 @@ async def main():
nonlocal completed, errors
async with sem:
r, elapsed, err = await run_one(
info["entry"], info["symbol"], info["tf"],
info["entry"], info["symbol"], info["tf"], info["pair_type"],
info["period_label"], info["start"], info["end"],
)
completed += 1
@@ -924,12 +929,13 @@ async def main():
else:
m = r.metrics
status = f"{m.annual_return_pct:+.1f}%/yr"
print(f" [{completed}/{total}] {info['strat_name']} {info['symbol']} {info['tf']} {info['period_label']} ({elapsed:.1f}s) {status}", flush=True)
print(f" [{completed}/{total}] {info['strat_name']} {info['symbol']} {info['tf']} {info['pair_type']} {info['period_label']} ({elapsed:.1f}s) {status}", flush=True)
row = {
"策略名": info["strat_name"],
"币种": info["symbol"],
"时间级别": info["tf"],
"类型": info["pair_type"],
"数据量": info["period_label"],
"策略类型": info["entry"]["strategy_cls"].strategy_type if r else "",
"策略参数": STRATEGY_PARAMS_STR.get(info["strat_name"], ""),
@@ -986,7 +992,7 @@ async def main():
# ── 打印完整表格 ──
print()
print("" * 195)
print(" 2h / 6h 全维度策略对比回测结果(9策略 × 4币种 × 2时间 × 4数据量)")
print(" 1h-1d 全维度策略对比回测结果(9策略 × 4币种 × 6时间 × 2类型 × 4数据量)")
print("" * 195)
print()
@@ -997,35 +1003,36 @@ async def main():
first = strat_results[0]
print(f"{strat_name} | 类型: {first['策略类型']} | {first['策略描述']}")
print(f" 参数: {first['策略参数']}")
print(f" {'币种':<10} {'时间':<5} {'数据量':<6} {'总收益%':>8} {'年化%':>8} {'夏普':>7} {'回撤%':>7} {'胜率%':>7} {'盈亏比':>7} {'交易':>6} {'日期范围':<24}")
print(" " + "" * 185)
print(f" {'币种':<10} {'时间':<5} {'类型':<5} {'数据量':<6} {'总收益%':>8} {'年化%':>8} {'夏普':>7} {'回撤%':>7} {'胜率%':>7} {'盈亏比':>7} {'交易':>6} {'日期范围':<24}")
print(" " + "" * 195)
strat_results.sort(key=lambda x: (SYMBOLS.index(x["币种"]), TIMEFRAMES.index(x["时间级别"]), list(PERIODS.keys()).index(x["数据量"])))
strat_results.sort(key=lambda x: (SYMBOLS.index(x["币种"]), TIMEFRAMES.index(x["时间级别"]), PAIR_TYPES.index(x["类型"]), list(PERIODS.keys()).index(x["数据量"])))
for r in strat_results:
print(f" {r['币种']:<10} {r['时间级别']:<5} {r['数据量']:<6} {r['总收益%']:>7.1f}% {r['年化收益%']:>7.1f}% {r['夏普比率']:>7.2f} {r['最大回撤%']:>7.1f}% {r['胜率%']:>6.1f}% {r['盈亏比']:>7.2f} {r['交易次数']:>6} {r['日期范围']:<24}")
print(f" {r['币种']:<10} {r['时间级别']:<5} {r['类型']:<5} {r['数据量']:<6} {r['总收益%']:>7.1f}% {r['年化收益%']:>7.1f}% {r['夏普比率']:>7.2f} {r['最大回撤%']:>7.1f}% {r['胜率%']:>6.1f}% {r['盈亏比']:>7.2f} {r['交易次数']:>6} {r['日期范围']:<24}")
print()
# ── 终极汇总 ──
print("" * 195)
print(" ■ 终极汇总:每组(时间级别+数据量)下各币种最佳策略(按年化收益)")
print(" ■ 终极汇总:每组(时间级别+类型+数据量)下各币种最佳策略(按年化收益)\n(1h结果可能因数据量过大导致内存/耗时问题,已设MAX_CONCURRENCY=6")
print("" * 195)
print()
for tf in TIMEFRAMES:
for period_label in PERIODS:
subset = [r for r in results if r["时间级别"] == tf and r["数据量"] == period_label and r.get("总收益%", 0) != 0]
if not subset:
continue
subset.sort(key=lambda x: x.get("年化收益%", -9999), reverse=True)
for pair_type in PAIR_TYPES:
for period_label in PERIODS:
subset = [r for r in results if r["时间级别"] == tf and r["类型"] == pair_type and r["数据量"] == period_label and r.get("总收益%", 0) != 0]
if not subset:
continue
subset.sort(key=lambda x: x.get("年化收益%", -9999), reverse=True)
print(f"{tf} | {period_label}")
print(f" {'排名':<5} {'策略名':<22} {'币种':<10} {'总收益%':>8} {'年化%':>8} {'夏普':>7} {'回撤%':>7} {'胜率%':>7} {'盈亏比':>7} {'交易':>6}")
print(" " + "" * 130)
for i, r in enumerate(subset[:5]):
marker = ["🥇", "🥈", "🥉", " 4", " 5"][i]
print(f" {marker:<5} {r['策略名']:<22} {r['币种']:<10} {r['总收益%']:>7.1f}% {r['年化收益%']:>7.1f}% {r['夏普比率']:>7.2f} {r['最大回撤%']:>7.1f}% {r['胜率%']:>6.1f}% {r['盈亏比']:>7.2f} {r['交易次数']:>6}")
print()
print(f"{tf} | {pair_type} | {period_label}")
print(f" {'排名':<5} {'策略名':<22} {'币种':<10} {'总收益%':>8} {'年化%':>8} {'夏普':>7} {'回撤%':>7} {'胜率%':>7} {'盈亏比':>7} {'交易':>6}")
print(" " + "" * 135)
for i, r in enumerate(subset[:5]):
marker = ["🥇", "🥈", "🥉", " 4", " 5"][i]
print(f" {marker:<5} {r['策略名']:<22} {r['币种']:<10} {r['总收益%']:>7.1f}% {r['年化收益%']:>7.1f}% {r['夏普比率']:>7.2f} {r['最大回撤%']:>7.1f}% {r['胜率%']:>6.1f}% {r['盈亏比']:>7.2f} {r['交易次数']:>6}")
print()
# ── 保存 JSON ──
output_file = _project_root / "engine" / "example" / "comparison_2h_6h_result.json"
@@ -1034,6 +1041,7 @@ async def main():
"config": {
"symbols": SYMBOLS,
"timeframes": TIMEFRAMES,
"pair_types": PAIR_TYPES,
"periods": list(PERIODS.keys()),
"initial_capital": INITIAL,
"warmup_bars": WARMUP,