4d66a86234
- AGENTS.md: AI Agent 规则,定义项目运行环境、架构约定、常用命令 - ENGINE.md: Python 引擎架构设计文档,涵盖策略管理、信号总线、回测引擎等模块设计
1338 lines
46 KiB
Markdown
1338 lines
46 KiB
Markdown
# 策略引擎开发文档
|
||
|
||
> 数字货币量化交易系统 — 策略引擎模块(Python 3.10+)设计与开发指南
|
||
|
||
---
|
||
|
||
## 目录
|
||
|
||
1. [概述与定位](#概述与定位)
|
||
2. [核心架构](#核心架构)
|
||
3. [策略基类设计](#策略基类设计)
|
||
4. [策略管理器](#策略管理器)
|
||
5. [信号分发器](#信号分发器)
|
||
6. [回测引擎](#回测引擎)
|
||
7. [参数优化器](#参数优化器)
|
||
8. [数据流与生命周期](#数据流与生命周期)
|
||
9. [与上下游模块的交互](#与上下游模块的交互)
|
||
10. [目录结构](#目录结构)
|
||
11. [开发步骤](#开发步骤)
|
||
12. [实现规范](#实现规范)
|
||
|
||
---
|
||
|
||
## 概述与定位
|
||
|
||
策略引擎是整个量化交易系统的**核心调度中心**,位于 Python 业务层,负责:
|
||
|
||
- **策略生命周期管理**:注册、热加载、启动、暂停、停止、卸载策略
|
||
- **实时信号生成**:消费 TS 数据模块推送的行情数据,驱动策略产生交易信号
|
||
- **回测验证**:使用历史数据模拟策略执行,评估收益、回撤、夏普比率等指标
|
||
- **参数搜索**:通过网格搜索或贝叶斯优化自动寻找最优策略参数
|
||
- **下游集成**:将交易信号通过信号分发器路由到交易执行模块和风控模块
|
||
|
||
### 在系统中的位置
|
||
|
||
```
|
||
┌──────────────────────────┐
|
||
│ 策略引擎 🐍 Python │
|
||
│ │
|
||
Redis Pub/Sub ──────► │ ┌────────────────────┐ │
|
||
行情数据 │ │ 信号分发器 │──► executor/
|
||
│ └────────────────────┘ │
|
||
TimescaleDB ◄─────── │ ┌────────────────────┐ │
|
||
历史数据(回测) │ │ 策略管理器 │ │
|
||
│ │ ┌──────┬──────┐ │ │
|
||
│ │ │策略A │策略B │ │ │
|
||
│ │ └──────┴──────┘ │ │
|
||
│ └────────────────────┘ │
|
||
│ ┌────────────────────┐ │
|
||
│ │ 回测引擎 │ │
|
||
│ └────────────────────┘ │
|
||
│ ┌────────────────────┐ │
|
||
│ │ 参数优化器 │ │
|
||
│ └────────────────────┘ │
|
||
└──────────────────────────┘
|
||
```
|
||
|
||
---
|
||
|
||
## 核心架构
|
||
|
||
### 模块依赖图
|
||
|
||
```
|
||
┌──────────────────┐
|
||
│ engine/common/ │ ← 配置、日志、数据模型、策略基类
|
||
└────────┬─────────┘
|
||
│
|
||
┌────────────────────┼────────────────────┐
|
||
│ │ │
|
||
┌────▼────┐ ┌──────▼──────┐ ┌─────▼──────┐
|
||
│ base.py │ │ manager.py │ │ consumers/ │
|
||
│策略基类 │◄───────│ 策略管理器 │ │ 行情消费 │
|
||
└────┬────┘ └──────┬──────┘ └────────────┘
|
||
│ │
|
||
│ ┌──────▼──────┐
|
||
┌────▼────┐ │ signals.py │ ┌─────────┐
|
||
│strategies│ │ 信号分发器 │─────►│executor/│
|
||
│ 策略实现 │ └─────────────┘ └─────────┘
|
||
└────┬────┘
|
||
│
|
||
┌────▼────┐ ┌──────────────┐
|
||
│backtest │ │ optimizer.py │
|
||
│回测引擎 │◄───────│ 参数优化器 │
|
||
└─────────┘ └──────────────┘
|
||
```
|
||
|
||
### 设计原则
|
||
|
||
| 原则 | 说明 |
|
||
|------|------|
|
||
| **插件化** | 策略以插件形式注册,新增策略无需修改引擎核心代码 |
|
||
| **异步优先** | 全部 I/O 操作使用 `asyncio`,保证高并发下的低延迟 |
|
||
| **类型安全** | 核心接口用 `ABC` + Pydantic 模型约束,运行时校验 |
|
||
| **无状态** | 引擎本身不持有策略状态,状态由策略实例自行管理 |
|
||
| **错误隔离** | 单个策略异常不影响其他策略运行,也不影响引擎主循环 |
|
||
| **可观测** | 每个生命周期事件埋点,输出结构化日志和 Prometheus 指标 |
|
||
|
||
---
|
||
|
||
## 策略基类设计
|
||
|
||
### 抽象基类
|
||
|
||
```python
|
||
# engine/common/base.py
|
||
from abc import ABC, abstractmethod
|
||
from typing import Optional
|
||
from pydantic import BaseModel
|
||
|
||
from .models import Ticker, Kline, OrderBook, Trade
|
||
|
||
|
||
class StrategyConfig(BaseModel):
|
||
"""策略配置(由具体策略子类化扩展)"""
|
||
name: str
|
||
symbol: str
|
||
exchange: str = "binance"
|
||
enabled: bool = True
|
||
max_position_pct: float = 0.1 # 最大仓位比例
|
||
stop_loss_pct: Optional[float] = None # 止损百分比
|
||
take_profit_pct: Optional[float] = None # 止盈百分比
|
||
|
||
|
||
class Signal(BaseModel):
|
||
"""交易信号"""
|
||
symbol: str
|
||
side: str # "BUY" / "SELL"
|
||
signal_type: str # "MARKET" / "LIMIT" / "CANCEL"
|
||
price: Optional[float] = None # 限价单价格
|
||
quantity: Optional[float] = None # 下单数量
|
||
confidence: float = 1.0 # 信号置信度 [0, 1]
|
||
reason: str = "" # 信号生成原因(便于审计)
|
||
timestamp: float # 信号时间戳
|
||
|
||
|
||
class BaseStrategy(ABC):
|
||
"""所有策略的抽象基类"""
|
||
|
||
# 策略类型标识,子类必须覆盖
|
||
strategy_type: str = "base"
|
||
|
||
def __init__(self, config: StrategyConfig):
|
||
self.config = config
|
||
self.is_running = False
|
||
self.pnl = 0.0
|
||
self.trade_count = 0
|
||
|
||
# ── 生命周期钩子 ──
|
||
|
||
async def on_start(self) -> None:
|
||
"""策略启动时调用(加载初始状态、预热等)"""
|
||
self.is_running = True
|
||
|
||
async def on_stop(self) -> None:
|
||
"""策略停止时调用(清理、平仓等)"""
|
||
self.is_running = False
|
||
|
||
async def on_pause(self) -> None:
|
||
"""策略暂停时调用"""
|
||
pass
|
||
|
||
async def on_resume(self) -> None:
|
||
"""策略恢复时调用"""
|
||
pass
|
||
|
||
# ── 行情事件处理器(子类按需实现)──
|
||
|
||
@abstractmethod
|
||
async def on_kline(self, kline: Kline) -> Optional[Signal]:
|
||
"""处理 K 线数据,返回交易信号"""
|
||
...
|
||
|
||
async def on_ticker(self, ticker: Ticker) -> Optional[Signal]:
|
||
"""处理 Ticker 数据(可选实现)"""
|
||
return None
|
||
|
||
async def on_trade(self, trade: Trade) -> Optional[Signal]:
|
||
"""处理逐笔成交数据(可选实现)"""
|
||
return None
|
||
|
||
async def on_orderbook(self, orderbook: OrderBook) -> Optional[Signal]:
|
||
"""处理深度数据(可选实现)"""
|
||
return None
|
||
|
||
# ── 交易反馈钩子 ──
|
||
|
||
async def on_order_filled(self, order_result) -> None:
|
||
"""订单成交回调"""
|
||
pass
|
||
|
||
async def on_order_cancelled(self, order_id: str) -> None:
|
||
"""订单撤销回调"""
|
||
pass
|
||
|
||
# ── 辅助方法 ──
|
||
|
||
async def get_klines(
|
||
self, interval: str, limit: int = 100
|
||
) -> list[Kline]:
|
||
"""从 TimescaleDB 获取历史 K 线(回测和预热用)"""
|
||
...
|
||
|
||
def log(self, level: str, message: str, **kwargs) -> None:
|
||
"""结构化日志"""
|
||
...
|
||
```
|
||
|
||
### 策略状态机
|
||
|
||
```
|
||
register
|
||
│
|
||
▼
|
||
┌────────┐ start ┌─────────┐
|
||
│ LOADED │ ──────────► │ RUNNING │
|
||
└────────┘ └────┬─────┘
|
||
▲ │
|
||
│ ┌────────┴────────┐
|
||
│ ▼ ▼
|
||
│ ┌─────────┐ ┌──────────┐
|
||
└─────────│ PAUSED │ │ STOPPING │
|
||
unload └────┬─────┘ └────┬─────┘
|
||
│ resume │
|
||
└───► RUNNING │
|
||
▼
|
||
┌──────────┐
|
||
│ STOPPED │
|
||
└────┬─────┘
|
||
│ unload
|
||
▼
|
||
(removed)
|
||
```
|
||
|
||
### 策略生命周期事件流
|
||
|
||
```
|
||
on_start()
|
||
│
|
||
▼
|
||
┌──────────────────────────────────────────────────────────┐
|
||
│ 主循环(事件驱动) │
|
||
│ │
|
||
│ on_kline(kline) ──► Signal? ──► signal_bus.emit(signal) │
|
||
│ on_ticker(ticker) ─► Signal? ──► signal_bus.emit(signal) │
|
||
│ on_orderbook(ob) ──► Signal? ──► signal_bus.emit(signal) │
|
||
│ │
|
||
│ on_order_filled(result) ←── executor 回调 │
|
||
│ on_order_cancelled(id) ←── executor 回调 │
|
||
└──────────────────────────────────────────────────────────┘
|
||
│
|
||
▼
|
||
on_stop()
|
||
```
|
||
|
||
---
|
||
|
||
## 策略管理器
|
||
|
||
策略管理器是引擎的入口组件,负责所有策略实例的注册、发现、加载和生命周期调度。
|
||
|
||
### 接口设计
|
||
|
||
```python
|
||
# engine/manager.py
|
||
import importlib
|
||
import pkgutil
|
||
from pathlib import Path
|
||
from typing import Type, Optional
|
||
from .common.logger import logger
|
||
|
||
from .common import BaseStrategy, StrategyConfig, Signal
|
||
from .signals import SignalBus
|
||
|
||
|
||
class StrategyManager:
|
||
"""
|
||
策略管理器:策略注册、热加载、生命周期调度
|
||
|
||
用法:
|
||
manager = StrategyManager(signal_bus=signal_bus)
|
||
manager.discover_strategies("strategies")
|
||
await manager.start_all()
|
||
"""
|
||
|
||
def __init__(self, signal_bus: SignalBus):
|
||
self._strategies: dict[str, BaseStrategy] = {}
|
||
self._strategy_classes: dict[str, Type[BaseStrategy]] = {}
|
||
self.signal_bus = signal_bus
|
||
|
||
# ── 策略发现与注册 ──
|
||
|
||
def discover_strategies(self, package_path: str = "strategies") -> list[str]:
|
||
"""
|
||
自动发现策略包下的所有策略类。
|
||
|
||
约定:
|
||
- 策略文件放在 `strategies/` 目录下
|
||
- 策略类必须继承 `BaseStrategy` 且在其模块中定义
|
||
|
||
返回发现的策略类型名列表。
|
||
"""
|
||
...
|
||
|
||
def register(self, strategy_cls: Type[BaseStrategy]) -> None:
|
||
"""手动注册策略类"""
|
||
...
|
||
|
||
# ── 策略实例管理 ──
|
||
|
||
def create(
|
||
self, strategy_type: str, config: StrategyConfig
|
||
) -> BaseStrategy:
|
||
"""
|
||
根据策略类型创建实例。
|
||
|
||
Raises:
|
||
ValueError: 策略类型未注册
|
||
"""
|
||
...
|
||
|
||
async def start(self, name: str) -> None:
|
||
"""启动指定策略"""
|
||
...
|
||
|
||
async def stop(self, name: str) -> None:
|
||
"""停止指定策略"""
|
||
...
|
||
|
||
async def pause(self, name: str) -> None:
|
||
"""暂停指定策略(保留状态,不接收行情)"""
|
||
...
|
||
|
||
async def resume(self, name: str) -> None:
|
||
"""恢复暂停的策略"""
|
||
...
|
||
|
||
async def restart(self, name: str, new_config: Optional[StrategyConfig] = None) -> BaseStrategy:
|
||
"""重启策略(可选地更新配置)"""
|
||
...
|
||
|
||
def get(self, name: str) -> Optional[BaseStrategy]:
|
||
"""获取策略实例"""
|
||
...
|
||
|
||
def list_strategies(self) -> list[dict]:
|
||
"""列出所有策略的状态"""
|
||
...
|
||
|
||
# ── 批量操作 ──
|
||
|
||
async def start_all(self) -> None:
|
||
"""启动所有 enabled 策略"""
|
||
...
|
||
|
||
async def stop_all(self) -> None:
|
||
"""停止所有策略"""
|
||
...
|
||
|
||
# ── 行情入口(由主循环调用)──
|
||
|
||
async def feed_kline(self, kline) -> None:
|
||
"""将 K 线数据分发到对应 symbol 的策略"""
|
||
...
|
||
|
||
async def feed_ticker(self, ticker) -> None:
|
||
"""将 Ticker 分发到对应 symbol 的策略"""
|
||
...
|
||
|
||
async def feed_orderbook(self, orderbook) -> None:
|
||
"""将 OrderBook 分发到对应 symbol 的策略"""
|
||
...
|
||
|
||
# ── 交易反馈入口 ──
|
||
|
||
async def notify_order_filled(self, strategy_name: str, order_result) -> None:
|
||
"""通知策略订单已成交"""
|
||
...
|
||
|
||
# ── 统计与监控 ──
|
||
|
||
def stats(self) -> dict:
|
||
"""返回引擎统计信息"""
|
||
...
|
||
```
|
||
|
||
### 策略热加载机制
|
||
|
||
```
|
||
策略文件变更
|
||
│
|
||
▼
|
||
┌──────────────────┐
|
||
│ 文件监听器 │
|
||
│ (watchdog) │
|
||
└────────┬─────────┘
|
||
│
|
||
┌──────────▼──────────┐
|
||
│ 重新 importlib.reload│
|
||
└──────────┬──────────┘
|
||
│
|
||
┌──────────▼──────────┐
|
||
│ 更新 _strategy_classes│
|
||
└──────────┬──────────┘
|
||
│
|
||
┌──────────▼──────────┐
|
||
│ 重新创建策略实例 │
|
||
│ (stop → create → start)│
|
||
└─────────────────────┘
|
||
```
|
||
|
||
---
|
||
|
||
## 信号分发器
|
||
|
||
信号分发器是策略到交易执行模块之间的**解耦层**,提供事件总线能力。
|
||
|
||
### 设计目标
|
||
|
||
- **解耦**:策略不直接调用交易 API,只 emit 信号
|
||
- **过滤**:同一 symbol 的冲突信号自动去重/合并
|
||
- **优先级**:支持信号优先级排序(高置信度优先)
|
||
- **限流**:防止策略过度交易,内置冷却时间
|
||
- **审计**:所有信号记录到数据库,可追溯
|
||
|
||
### 接口设计
|
||
|
||
```python
|
||
# engine/signals.py
|
||
import asyncio
|
||
from collections import defaultdict
|
||
from typing import Callable, Awaitable
|
||
|
||
from .common import Signal
|
||
|
||
|
||
SignalHandler = Callable[[Signal], Awaitable[None]]
|
||
|
||
|
||
class SignalBus:
|
||
"""信号事件总线"""
|
||
|
||
def __init__(self):
|
||
self._handlers: dict[str, list[SignalHandler]] = defaultdict(list)
|
||
self._signal_history: list[Signal] = []
|
||
self._cooldowns: dict[str, float] = {} # symbol -> 下次允许信号时间
|
||
|
||
def subscribe(self, event_type: str, handler: SignalHandler) -> None:
|
||
"""
|
||
订阅信号类型。
|
||
|
||
event_type: "order" / "alert" / "log" 或自定义
|
||
handler: async callable,接收 Signal 参数
|
||
"""
|
||
...
|
||
|
||
async def emit(self, signal: Signal) -> None:
|
||
"""
|
||
发布信号。会经过以下处理链:
|
||
1. 冷却检查
|
||
2. 冲突检测
|
||
3. 优先级排序
|
||
4. 分发给所有订阅者
|
||
5. 记录到审计日志
|
||
"""
|
||
...
|
||
|
||
def set_cooldown(self, symbol: str, seconds: float) -> None:
|
||
"""设置 symbol 的下单冷却时间"""
|
||
...
|
||
|
||
def recent_signals(self, symbol: str, n: int = 10) -> list[Signal]:
|
||
"""获取最近的信号历史"""
|
||
...
|
||
|
||
|
||
class SignalFilter:
|
||
"""信号过滤器基类(责任链模式)"""
|
||
|
||
async def filter(self, signal: Signal) -> Signal | None:
|
||
"""返回过滤后的信号,返回 None 表示丢弃"""
|
||
return signal
|
||
|
||
|
||
class CooldownFilter(SignalFilter):
|
||
"""冷却时间过滤器"""
|
||
...
|
||
|
||
|
||
class DuplicateFilter(SignalFilter):
|
||
"""重复信号过滤器(同一 symbol 短期内相同方向合并)"""
|
||
...
|
||
|
||
|
||
class ConfidenceFilter(SignalFilter):
|
||
"""置信度过滤器(低于阈值的信号丢弃)"""
|
||
...
|
||
```
|
||
|
||
### 信号流向
|
||
|
||
```
|
||
策略A.on_kline() ──► Signal ──┐
|
||
策略B.on_ticker() ─► Signal ──┤
|
||
策略C.on_kline() ──► Signal ──┤
|
||
▼
|
||
┌────────────────┐
|
||
│ SignalBus │
|
||
│ │
|
||
│ ┌──────────┐ │
|
||
│ │ 冷却检查 │ │
|
||
│ └────┬─────┘ │
|
||
│ ┌────▼─────┐ │
|
||
│ │ 去重合并 │ │
|
||
│ └────┬─────┘ │
|
||
│ ┌────▼─────┐ │
|
||
│ │ 优先级排序│ │
|
||
│ └────┬─────┘ │
|
||
└───────┼────────┘
|
||
│
|
||
┌─────────────┼─────────────┐
|
||
▼ ▼ ▼
|
||
┌──────────┐ ┌──────────┐ ┌──────────┐
|
||
│ executor │ │ risk/ │ │ monitor/ │
|
||
│ 交易执行 │ │ 风控 │ │ 日志 │
|
||
└──────────┘ └──────────┘ └──────────┘
|
||
```
|
||
|
||
---
|
||
|
||
## 回测引擎
|
||
|
||
回测引擎使用历史 K 线数据模拟策略执行,评估策略在历史行情中的表现。
|
||
|
||
### 设计目标
|
||
|
||
- **事件驱动**:模拟真实行情推送,按时间顺序逐根 K 线喂给策略
|
||
- **精确模拟**:考虑手续费、滑点、最小下单量等实际交易约束
|
||
- **多维度指标**:总收益率、年化收益、夏普比率、最大回撤、胜率、盈亏比
|
||
- **可视化输出**:资金曲线图、回撤曲线、交易点标注
|
||
|
||
### 架构
|
||
|
||
```
|
||
┌──────────────────────────┐
|
||
│ 回测引擎 │
|
||
│ │
|
||
历史K线数据 ─────►│ ┌────────────────────┐ │
|
||
(TimescaleDB) │ │ BacktestEngine │ │
|
||
│ │ │ │
|
||
策略配置 ────────►│ │ - 加载历史K线 │ │
|
||
│ │ - 逐根推送给策略 │ │
|
||
策略类 ──────────►│ │ - 模拟订单成交 │ │
|
||
│ │ - 计算性能指标 │ │
|
||
│ │ - 生成回测报告 │ │
|
||
│ └────────────────────┘ │
|
||
└──────────┬───────────────┘
|
||
│
|
||
▼
|
||
┌──────────────────────────┐
|
||
│ 回测结果 │
|
||
│ - equity_curve │
|
||
│ - trades │
|
||
│ - metrics │
|
||
│ - report (HTML/JSON) │
|
||
└──────────────────────────┘
|
||
```
|
||
|
||
### 接口设计
|
||
|
||
```python
|
||
# engine/backtest.py
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime
|
||
from typing import Optional, Type
|
||
from pydantic import BaseModel
|
||
|
||
from .common import BaseStrategy, StrategyConfig
|
||
|
||
|
||
@dataclass
|
||
class BacktestConfig:
|
||
"""回测配置"""
|
||
symbol: str
|
||
exchange: str = "binance"
|
||
interval: str = "1h"
|
||
start_time: datetime
|
||
end_time: datetime
|
||
|
||
# 交易成本
|
||
commission_pct: float = 0.001 # 手续费率 (0.1%)
|
||
slippage_pct: float = 0.0005 # 滑点 (0.05%)
|
||
min_order_qty: float = 0.001 # 最小下单量
|
||
|
||
# 资金
|
||
initial_capital: float = 10_000.0 # 初始资金
|
||
|
||
# 数据
|
||
warmup_bars: int = 100 # 预热 K 线条数
|
||
|
||
|
||
@dataclass
|
||
class BacktestTrade:
|
||
"""单笔回测交易记录"""
|
||
timestamp: datetime
|
||
symbol: str
|
||
side: str
|
||
price: float
|
||
quantity: float
|
||
commission: float
|
||
slippage: float
|
||
pnl: Optional[float] = None
|
||
reason: str = ""
|
||
|
||
|
||
@dataclass
|
||
class BacktestMetrics:
|
||
"""回测性能指标"""
|
||
total_return_pct: float # 总收益率
|
||
annual_return_pct: float # 年化收益率
|
||
sharpe_ratio: float # 夏普比率
|
||
sortino_ratio: float # 索提诺比率
|
||
max_drawdown_pct: float # 最大回撤
|
||
max_drawdown_duration: int # 最大回撤持续天数
|
||
win_rate: float # 胜率
|
||
profit_factor: float # 盈亏比
|
||
total_trades: int # 总交易次数
|
||
avg_holding_hours: float # 平均持仓时间(小时)
|
||
calmar_ratio: float # 卡尔玛比率
|
||
|
||
|
||
@dataclass
|
||
class BacktestResult:
|
||
"""完整回测结果"""
|
||
config: BacktestConfig
|
||
strategy_config: StrategyConfig
|
||
metrics: BacktestMetrics
|
||
trades: list[BacktestTrade]
|
||
equity_curve: list[dict] # [{timestamp, equity, drawdown}, ...]
|
||
|
||
|
||
class BacktestEngine:
|
||
"""回测引擎"""
|
||
|
||
def __init__(self, config: BacktestConfig):
|
||
self.config = config
|
||
self._klines: list = []
|
||
self._trades: list[BacktestTrade] = []
|
||
self._equity: list[dict] = []
|
||
|
||
async def load_data(self) -> None:
|
||
"""从 TimescaleDB 加载历史 K 线数据"""
|
||
...
|
||
|
||
async def run(
|
||
self,
|
||
strategy_cls: Type[BaseStrategy],
|
||
strategy_config: StrategyConfig,
|
||
) -> BacktestResult:
|
||
"""
|
||
执行回测。
|
||
|
||
流程:
|
||
1. 预热:先喂 warmup_bars 根 K 线但不产生交易
|
||
2. 回测循环:逐根 K 线推送给策略
|
||
3. 收到 Signal 时模拟订单成交
|
||
4. 每根 K 线更新资金曲线
|
||
5. 计算最终指标
|
||
"""
|
||
...
|
||
|
||
async def run_batch(
|
||
self,
|
||
strategy_cls: Type[BaseStrategy],
|
||
configs: list[StrategyConfig],
|
||
max_concurrency: int = 4,
|
||
) -> list[BacktestResult]:
|
||
"""批量回测(并行执行,用于参数扫描)"""
|
||
...
|
||
|
||
@staticmethod
|
||
def compute_metrics(
|
||
equity_curve: list[dict],
|
||
trades: list[BacktestTrade],
|
||
config: BacktestConfig,
|
||
) -> BacktestMetrics:
|
||
"""从资金曲线和交易记录计算性能指标"""
|
||
...
|
||
|
||
def generate_report(self, result: BacktestResult, format: str = "json") -> str:
|
||
"""生成回测报告(json/html/markdown)"""
|
||
...
|
||
|
||
def plot(self, result: BacktestResult) -> None:
|
||
"""绘制资金曲线和回撤图(matplotlib)"""
|
||
...
|
||
```
|
||
|
||
### 回测执行流程
|
||
|
||
```
|
||
开始
|
||
│
|
||
▼
|
||
┌─────────────────────┐
|
||
│ 加载历史 K 线数据 │
|
||
│ (TimescaleDB) │
|
||
└──────────┬──────────┘
|
||
│
|
||
▼
|
||
┌─────────────────────┐
|
||
│ 预热阶段 │
|
||
│ 喂入 warmup_bars │
|
||
│ 策略初始化指标 │
|
||
└──────────┬──────────┘
|
||
│
|
||
▼
|
||
┌─────────────────────┐
|
||
│ 回测主循环 │
|
||
│ │
|
||
│ for kline in data: │
|
||
│ signal = strat. │
|
||
│ on_kline(kline)│
|
||
│ if signal: │
|
||
│ simulate_trade │
|
||
│ update_equity() │
|
||
│ │
|
||
└──────────┬──────────┘
|
||
│
|
||
▼
|
||
┌─────────────────────┐
|
||
│ 计算性能指标 │
|
||
│ - 收益率 │
|
||
│ - 夏普比率 │
|
||
│ - 最大回撤 │
|
||
│ - 胜率等 │
|
||
└──────────┬──────────┘
|
||
│
|
||
▼
|
||
┌─────────────────────┐
|
||
│ 生成回测报告 │
|
||
│ - JSON 结果 │
|
||
│ - 资金曲线图 │
|
||
│ - HTML 报告 │
|
||
└─────────────────────┘
|
||
```
|
||
|
||
### 交易模拟逻辑
|
||
|
||
```
|
||
收到 Signal
|
||
│
|
||
▼
|
||
┌──────────────┐
|
||
│ 输入价格 = │
|
||
│ kline.close │
|
||
│ + slippage │
|
||
└──────┬───────┘
|
||
│
|
||
▼
|
||
┌──────────────┐
|
||
│ 检查余额 │──── 不足 ──► 跳过,记录日志
|
||
│ 检查最小下单量 │
|
||
└──────┬───────┘
|
||
│ 通过
|
||
▼
|
||
┌──────────────┐
|
||
│ 执行成交 │
|
||
│ - 更新持仓 │
|
||
│ - 扣除手续费 │
|
||
│ - 记录交易 │
|
||
│ - 计算盈亏 │
|
||
└──────────────┘
|
||
```
|
||
|
||
---
|
||
|
||
## 参数优化器
|
||
|
||
参数优化器基于回测引擎,自动搜索最优策略参数组合。
|
||
|
||
### 支持的优化策略
|
||
|
||
| 方法 | 适用场景 | 特点 |
|
||
|------|---------|------|
|
||
| **网格搜索** | 参数空间小(2-3 维) | 穷举所有组合,结果完备但慢 |
|
||
| **随机搜索** | 参数空间中等 | 比网格搜索更高效 |
|
||
| **贝叶斯优化** | 参数空间大 | 利用先验知识,收敛快 |
|
||
| **遗传算法** | 复杂非线性参数空间 | 全局搜索能力强 |
|
||
|
||
### 接口设计
|
||
|
||
```python
|
||
# engine/optimizer.py
|
||
from dataclasses import dataclass, field
|
||
from typing import Callable, Any
|
||
|
||
from backtest import BacktestEngine, BacktestConfig, BacktestResult
|
||
from common import BaseStrategy, StrategyConfig
|
||
|
||
|
||
@dataclass
|
||
class ParamSpec:
|
||
"""参数规格定义"""
|
||
name: str
|
||
type: str # "int" / "float" / "choice"
|
||
low: Optional[float] = None # 数值参数下界
|
||
high: Optional[float] = None # 数值参数上界
|
||
step: Optional[float] = None # 步长(网格搜索用)
|
||
choices: Optional[list[Any]] = None # 离散选择
|
||
log_scale: bool = False # 是否对数尺度搜索
|
||
|
||
|
||
@dataclass
|
||
class OptimizationConfig:
|
||
"""优化器配置"""
|
||
method: str = "bayesian" # "grid" / "random" / "bayesian" / "genetic"
|
||
objective: str = "sharpe_ratio" # 优化目标指标
|
||
max_iterations: int = 100 # 最大迭代次数
|
||
n_jobs: int = 4 # 并行任务数
|
||
early_stopping_rounds: int = 10 # 早停轮数
|
||
cv_folds: int = 3 # 交叉验证折数
|
||
|
||
|
||
@dataclass
|
||
class OptimizationResult:
|
||
"""单次优化结果"""
|
||
params: dict[str, Any] # 最优参数
|
||
score: float # 目标函数值
|
||
all_trials: list[dict] # 所有试验记录
|
||
best_result: BacktestResult # 最优参数的回测结果
|
||
|
||
|
||
class ParamOptimizer:
|
||
"""参数优化器"""
|
||
|
||
def __init__(
|
||
self,
|
||
backtest_config: BacktestConfig,
|
||
strategy_cls: Type[BaseStrategy],
|
||
base_config: StrategyConfig,
|
||
param_specs: list[ParamSpec],
|
||
opt_config: OptimizationConfig = OptimizationConfig(),
|
||
):
|
||
...
|
||
|
||
async def optimize(self) -> OptimizationResult:
|
||
"""
|
||
执行参数优化。
|
||
|
||
返回最优参数组合及对应的回测结果。
|
||
"""
|
||
...
|
||
|
||
def suggest_params(self, n: int = 5) -> list[dict]:
|
||
"""获取 top-n 参数组合建议"""
|
||
...
|
||
|
||
def plot_importance(self) -> None:
|
||
"""绘制参数重要性图"""
|
||
...
|
||
|
||
def plot_parallel_coordinates(self) -> None:
|
||
"""绘制参数平行坐标图"""
|
||
...
|
||
```
|
||
|
||
### 优化流程
|
||
|
||
```
|
||
┌────────────────┐
|
||
│ 定义参数空间 │
|
||
│ ParamSpec[] │
|
||
└───────┬────────┘
|
||
│
|
||
▼
|
||
┌────────────────┐
|
||
│ 选择优化算法 │
|
||
│ (网格/贝叶斯等) │
|
||
└───────┬────────┘
|
||
│
|
||
┌────────▼────────┐
|
||
│ 优化主循环 │
|
||
│ │
|
||
│ while n < max: │
|
||
│ params = │
|
||
│ suggest() │
|
||
│ result = │
|
||
│ backtest() │
|
||
│ score = │
|
||
│ metric() │
|
||
│ update() │
|
||
│ check_early()│
|
||
│ │
|
||
└────────┬────────┘
|
||
│
|
||
▼
|
||
┌────────────────┐
|
||
│ 返回最优结果 │
|
||
│ + 所有试验记录 │
|
||
└────────────────┘
|
||
```
|
||
|
||
### 目标函数
|
||
|
||
```
|
||
目标函数可选指标(最小化回撤时取负值):
|
||
|
||
sharpe_ratio — 夏普比率(风险调整后收益,推荐)
|
||
sortino_ratio — 索提诺比率(只考虑下行波动)
|
||
total_return — 总收益率
|
||
calmar_ratio — 卡尔玛比率(收益/最大回撤)
|
||
profit_factor — 盈亏比
|
||
custom — 自定义组合指标
|
||
```
|
||
|
||
---
|
||
|
||
## 数据流与生命周期
|
||
|
||
### 实时交易数据流
|
||
|
||
```
|
||
Redis Pub/Sub engine/ executor/
|
||
───────────── ───────── ─────────
|
||
|
||
market:kline:* ──► RedisConsumer ──► Manager.feed_kline()
|
||
│
|
||
▼
|
||
路由到对应 symbol 策略
|
||
│
|
||
┌─────┴─────┐
|
||
▼ ▼
|
||
策略A.on_kline 策略B.on_kline
|
||
│ │
|
||
Signal? Signal?
|
||
│ │
|
||
└─────┬─────┘
|
||
▼
|
||
SignalBus.emit()
|
||
│
|
||
┌───────────┼───────────┐
|
||
▼ ▼ ▼
|
||
冷却检查 去重合并 优先级排序
|
||
│ │ │
|
||
└───────────┼───────────┘
|
||
▼
|
||
┌──────────┐
|
||
│ risk/ │ ← 风控检查
|
||
│ manager │
|
||
└────┬─────┘
|
||
│ passed
|
||
▼
|
||
┌──────────┐
|
||
│ executor │ ← 实际下单
|
||
└──────────┘
|
||
│
|
||
▼
|
||
订单状态回调
|
||
│
|
||
▼
|
||
Manager.notify_order_filled()
|
||
│
|
||
▼
|
||
策略.on_order_filled()
|
||
```
|
||
|
||
### 回测数据流
|
||
|
||
```
|
||
TimescaleDB ──► BacktestEngine.load_data()
|
||
│
|
||
▼
|
||
历史 K 线列表
|
||
│
|
||
▼
|
||
BacktestEngine.run()
|
||
│
|
||
▼
|
||
逐根推送 K 线 ──► 策略.on_kline()
|
||
│ │
|
||
│ Signal?
|
||
│ │
|
||
▼ ▼
|
||
更新资金曲线 simulate_trade()
|
||
│
|
||
▼
|
||
计算指标 ──► BacktestResult
|
||
```
|
||
|
||
### 引擎主循环
|
||
|
||
```python
|
||
# engine/main.py — 主程序入口示意
|
||
import asyncio
|
||
from config import settings
|
||
from common.storage import TimescaleDBReader
|
||
from manager import StrategyManager
|
||
from signals import SignalBus
|
||
|
||
|
||
async def main():
|
||
# 1. 初始化信号总线
|
||
signal_bus = SignalBus()
|
||
|
||
# 2. 初始化策略管理器
|
||
manager = StrategyManager(signal_bus)
|
||
manager.discover_strategies("strategies")
|
||
|
||
# 3. 创建并注册策略实例
|
||
for cfg in settings.strategies:
|
||
manager.create(cfg.type, cfg)
|
||
|
||
# 4. 启动所有策略
|
||
await manager.start_all()
|
||
|
||
# 5. 订阅 Redis 行情
|
||
consumer = RedisMarketConsumer(settings.redis_url)
|
||
await consumer.subscribe_kline(manager.feed_kline)
|
||
await consumer.subscribe_ticker(manager.feed_ticker)
|
||
|
||
# 6. 保持运行
|
||
await asyncio.Event().wait()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|
||
```
|
||
|
||
---
|
||
|
||
## 与上下游模块的交互
|
||
|
||
### 上游:TypeScript 数据模块
|
||
|
||
引擎通过 Redis Pub/Sub 消费 TS 侧推送的行情数据:
|
||
|
||
| Redis 频道 | 数据类型 | 对应策略方法 |
|
||
|-----------|---------|-------------|
|
||
| `market:ticker:{exchange}:{symbol}` | Ticker | `on_ticker()` |
|
||
| `market:kline:{exchange}:{symbol}:{interval}` | Kline | `on_kline()` |
|
||
| `market:trade:{exchange}:{symbol}` | Trade | `on_trade()` |
|
||
| `market:orderbook:{exchange}:{symbol}` | OrderBook | `on_orderbook()` |
|
||
|
||
数据格式使用 **MessagePack** 序列化,Python 侧用 `msgpack.unpackb()` 反序列化为 Pydantic 模型。
|
||
|
||
### 下游:交易执行模块
|
||
|
||
引擎通过 `SignalBus` 发布信号,executor 订阅:
|
||
|
||
```python
|
||
# executor 侧订阅
|
||
signal_bus.subscribe("order", executor.handle_signal)
|
||
```
|
||
|
||
### 下游:风控模块
|
||
|
||
风控模块也订阅信号总线,在 executor 执行前进行风控检查:
|
||
|
||
```python
|
||
# risk 侧订阅
|
||
signal_bus.subscribe("order", risk_manager.check_signal)
|
||
```
|
||
|
||
### 下游:监控模块
|
||
|
||
监控模块订阅所有信号用于记录和统计:
|
||
|
||
```python
|
||
# monitor 侧订阅
|
||
signal_bus.subscribe("*", monitor.record_signal)
|
||
```
|
||
|
||
---
|
||
|
||
## 目录结构
|
||
|
||
```
|
||
engine/
|
||
├── ENGINE.md # 本文档
|
||
├── __init__.py
|
||
├── env.yaml # 引擎环境配置
|
||
│
|
||
├── common/ # 通用模块(基础数据类型、策略基类、日志)
|
||
│ ├── __init__.py
|
||
│ ├── base.py # BaseStrategy 抽象基类 + Signal / StrategyConfig
|
||
│ ├── models.py # 数据模型:Ticker / Kline / Trade / OrderBook
|
||
│ └── logger.py # 结构化日志模块
|
||
│
|
||
├── manager.py # StrategyManager 策略管理器
|
||
├── signals.py # SignalBus 信号分发器 + SignalFilter 过滤器
|
||
├── backtest.py # BacktestEngine 回测引擎
|
||
├── optimizer.py # ParamOptimizer 参数优化器
|
||
├── main.py # 引擎主循环入口
|
||
│
|
||
├── indicators/ # 技术指标计算(基于 TA-Lib / pandas_ta)
|
||
│ ├── __init__.py
|
||
│ ├── trend.py # 趋势指标 (MA, EMA, MACD, ADX...)
|
||
│ ├── momentum.py # 动量指标 (RSI, Stochastic, CCI...)
|
||
│ ├── volatility.py # 波动率指标 (Bollinger, ATR, Keltner...)
|
||
│ ├── volume.py # 成交量指标 (OBV, VWAP, MFI...)
|
||
│ └── composite.py # 复合指标(自定义组合)
|
||
│
|
||
├── consumers/ # 行情消费器(Redis → 策略馈送)
|
||
│ ├── __init__.py
|
||
│ ├── redis.py # Redis Pub/Sub 消费者
|
||
│ └── nats.py # NATS 消费者(可选)
|
||
│
|
||
└── tests/ # 引擎测试
|
||
├── __init__.py
|
||
├── test_base.py
|
||
├── test_manager.py
|
||
├── test_signals.py
|
||
├── test_backtest.py
|
||
├── test_optimizer.py
|
||
└── fixtures/ # 测试夹具(模拟行情数据)
|
||
└── sample_klines.json
|
||
```
|
||
|
||
---
|
||
|
||
## 开发步骤
|
||
|
||
### 第一阶段:核心框架(第 1 周)
|
||
|
||
| 步骤 | 任务 | 产出 |
|
||
|------|------|------|
|
||
| 1.1 | 实现 `common/base.py` — `BaseStrategy`、`Signal`、`StrategyConfig` | 策略基类和类型定义 |
|
||
| 1.2 | 实现 `common/models.py` — `Ticker`、`Kline`、`Trade`、`OrderBook` Pydantic 模型 | 与 TS 对齐的数据模型 |
|
||
| 1.3 | 实现 `common/logger.py` — 结构化日志模块 | 统一日志输出 |
|
||
| 1.3 | 实现 `common/logger.py` — 结构化日志模块 | 统一日志输出 |
|
||
| 1.4 | 实现 `signals.py` — `SignalBus`、`CooldownFilter`、`DuplicateFilter` | 事件总线和过滤链 |
|
||
| 1.5 | 实现 `manager.py` — 策略注册、创建、启停 | 策略管理器 |
|
||
| 1.6 | 实现 `consumers/redis.py` — Redis Pub/Sub 消费者 | 实时行情接收 |
|
||
| 1.7 | 编写 `main.py` — 引擎主循环串联 | 端到端可运行 |
|
||
|
||
### 第二阶段:指标与策略示例(第 2 周)
|
||
|
||
| 步骤 | 任务 | 产出 |
|
||
|------|------|------|
|
||
| 2.1 | 实现 `indicators/` — MA、EMA、MACD、RSI、Bollinger | 技术指标库 |
|
||
| 2.2 | 编写示例策略 1:双均线交叉 | `strategies/ma_cross.py` |
|
||
| 2.3 | 编写示例策略 2:网格交易 | `strategies/grid_trading.py` |
|
||
| 2.4 | 集成测试:Redis 行情 → 策略 → 信号 | 端到端验证 |
|
||
|
||
### 第三阶段:回测(第 3 周)
|
||
|
||
| 步骤 | 任务 | 产出 |
|
||
|------|------|------|
|
||
| 3.1 | 实现 `backtest.py` — 回测引擎核心 | 事件驱动回测 |
|
||
| 3.2 | 实现交易模拟逻辑(手续费、滑点) | 精确回测 |
|
||
| 3.3 | 实现性能指标计算(夏普比率、最大回撤等) | 完整指标 |
|
||
| 3.4 | 实现回测报告生成(JSON/HTML) | 可视化报告 |
|
||
| 3.5 | 编写回测测试用例 | 回测验证 |
|
||
|
||
### 第四阶段:参数优化(第 4 周)
|
||
|
||
| 步骤 | 任务 | 产出 |
|
||
|------|------|------|
|
||
| 4.1 | 实现 `optimizer.py` — 网格搜索 | 穷举搜索 |
|
||
| 4.2 | 集成 Optuna 实现贝叶斯优化 | 智能搜索 |
|
||
| 4.3 | 实现交叉验证 | 防止过拟合 |
|
||
| 4.4 | 实现参数重要性分析 | 参数洞察 |
|
||
|
||
---
|
||
|
||
## 实现规范
|
||
|
||
### 异步编程规范
|
||
|
||
- 所有 I/O 操作使用 `async/await`
|
||
- 长时间计算任务使用 `asyncio.to_thread()` 放入线程池
|
||
- 避免在协程中阻塞(不用 `time.sleep`,用 `asyncio.sleep`)
|
||
- 策略的 `on_*` 方法必须是非阻塞的快速返回
|
||
|
||
### 错误处理
|
||
|
||
```python
|
||
# 策略错误隔离模式
|
||
async def feed_kline(self, kline) -> None:
|
||
for name, strategy in self._strategies.items():
|
||
if not strategy.is_running:
|
||
continue
|
||
if strategy.config.symbol != kline.symbol:
|
||
continue
|
||
try:
|
||
signal = await strategy.on_kline(kline)
|
||
if signal:
|
||
await self.signal_bus.emit(signal)
|
||
except Exception as e:
|
||
logger.error(f"Strategy {name} error: {e}", exc_info=True)
|
||
# 单策略异常不影响其他策略
|
||
```
|
||
|
||
### 日志规范
|
||
|
||
```python
|
||
from common.logger import logger
|
||
|
||
# 使用结构化日志
|
||
logger.info("strategy_signal", strategy="ma_cross", symbol="BTCUSDT",
|
||
side="BUY", confidence=0.85, timestamp=1234567890)
|
||
logger.warning("strategy_error", strategy="grid", error=str(e))
|
||
logger.info("backtest_complete", symbol="ETHUSDT", sharpe=1.45, trades=230)
|
||
```
|
||
|
||
### 类型规范
|
||
|
||
- 所有公开接口使用类型注解
|
||
- 数据模型使用 Pydantic `BaseModel`
|
||
- 策略配置继承 `StrategyConfig` 并扩展
|
||
- 信号必须通过 `Signal` 模型创建,禁止使用裸 dict
|
||
|
||
### 测试规范
|
||
|
||
- 核心逻辑(SignalBus、StrategyManager)单元测试覆盖率 > 90%
|
||
- 策略逻辑有独立单元测试
|
||
- 回测引擎用已知结果的静态数据集验证
|
||
- 使用 `pytest-asyncio` 支持异步测试
|
||
|
||
### 性能基准
|
||
|
||
| 指标 | 目标值 | 说明 |
|
||
|------|--------|------|
|
||
| 策略单次 `on_kline` 耗时 | < 1ms | 不含 I/O 的纯计算 |
|
||
| SignalBus 发送延迟 | < 0.1ms | emit 到 handler 开始执行 |
|
||
| 回测速度 | > 10000 bars/s | 10 万根 K 线 < 10 秒 |
|
||
| 100 策略并行 | CPU < 50% | 空策略情况下的资源占用 |
|
||
|
||
---
|
||
|
||
## 附录
|
||
|
||
### A. 示例策略实现(双均线交叉)
|
||
|
||
```python
|
||
# strategies/ma_cross.py
|
||
from typing import Optional
|
||
from common import BaseStrategy, StrategyConfig, Signal
|
||
|
||
|
||
class MACrossConfig(StrategyConfig):
|
||
fast_period: int = 7
|
||
slow_period: int = 25
|
||
|
||
|
||
class MACrossStrategy(BaseStrategy):
|
||
strategy_type = "ma_cross"
|
||
|
||
def __init__(self, config: MACrossConfig):
|
||
super().__init__(config)
|
||
self.config: MACrossConfig = config
|
||
self._prices: list[float] = []
|
||
self._last_signal: Optional[str] = None # "BUY" / "SELL"
|
||
|
||
def _sma(self, data: list[float], period: int) -> float:
|
||
if len(data) < period:
|
||
return 0.0
|
||
return sum(data[-period:]) / period
|
||
|
||
async def on_kline(self, kline) -> Optional[Signal]:
|
||
self._prices.append(kline.close)
|
||
|
||
fast = self._sma(self._prices, self.config.fast_period)
|
||
slow = self._sma(self._prices, self.config.slow_period)
|
||
|
||
if fast == 0 or slow == 0:
|
||
return None
|
||
|
||
if fast > slow and self._last_signal != "BUY":
|
||
self._last_signal = "BUY"
|
||
return Signal(
|
||
symbol=self.config.symbol,
|
||
side="BUY",
|
||
signal_type="MARKET",
|
||
confidence=0.7,
|
||
reason=f"Golden cross: MA{self.config.fast_period} > MA{self.config.slow_period}",
|
||
timestamp=kline.timestamp,
|
||
)
|
||
|
||
if fast < slow and self._last_signal != "SELL":
|
||
self._last_signal = "SELL"
|
||
return Signal(
|
||
symbol=self.config.symbol,
|
||
side="SELL",
|
||
signal_type="MARKET",
|
||
confidence=0.7,
|
||
reason=f"Death cross: MA{self.config.fast_period} < MA{self.config.slow_period}",
|
||
timestamp=kline.timestamp,
|
||
)
|
||
|
||
return None
|
||
```
|
||
|
||
### B. 引擎配置示例(engine/env.yaml)
|
||
|
||
```yaml
|
||
# engine/env.yaml
|
||
engine:
|
||
name: "trade-engine"
|
||
strategies_package: "strategies"
|
||
max_concurrent_strategies: 50
|
||
|
||
redis:
|
||
url: "redis://localhost:6379/0"
|
||
channels:
|
||
kline: "market:kline:*"
|
||
ticker: "market:ticker:*"
|
||
orderbook: "market:orderbook:*"
|
||
|
||
signals:
|
||
cooldown_seconds: 1.0 # 同 symbol 下单冷却时间
|
||
min_confidence: 0.3 # 最低信号置信度
|
||
max_signals_per_minute: 60 # 每分钟最大信号数
|
||
|
||
backtest:
|
||
default_commission: 0.001
|
||
default_slippage: 0.0005
|
||
default_capital: 10000.0
|
||
|
||
optimizer:
|
||
default_method: "bayesian"
|
||
default_objective: "sharpe_ratio"
|
||
max_iterations: 200
|
||
n_jobs: 4
|
||
early_stopping: 20
|
||
```
|
||
|
||
### C. 相关文档索引
|
||
|
||
| 文档 | 路径 |
|
||
|------|------|
|
||
| 系统总体架构 | `../README.md` |
|
||
| 数据模块开发指南 | `../data/ENGINE.md` (待创建) |
|
||
| 交易执行模块 | `../executor/ENGINE.md` (待创建) |
|
||
| 风控模块 | `../risk/ENGINE.md` (待创建) |
|
||
| 公共模块 | `./common/__init__.py` |
|