Files
Rekey 4d66a86234 docs: 添加项目规范与引擎设计文档
- AGENTS.md: AI Agent 规则,定义项目运行环境、架构约定、常用命令
- ENGINE.md: Python 引擎架构设计文档,涵盖策略管理、信号总线、回测引擎等模块设计
2026-06-12 10:26:30 +08:00

1338 lines
46 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 策略引擎开发文档
> 数字货币量化交易系统 — 策略引擎模块(Python 3.10+)设计与开发指南
---
## 目录
1. [概述与定位](#概述与定位)
2. [核心架构](#核心架构)
3. [策略基类设计](#策略基类设计)
4. [策略管理器](#策略管理器)
5. [信号分发器](#信号分发器)
6. [回测引擎](#回测引擎)
7. [参数优化器](#参数优化器)
8. [数据流与生命周期](#数据流与生命周期)
9. [与上下游模块的交互](#与上下游模块的交互)
10. [目录结构](#目录结构)
11. [开发步骤](#开发步骤)
12. [实现规范](#实现规范)
---
## 概述与定位
策略引擎是整个量化交易系统的**核心调度中心**,位于 Python 业务层,负责:
- **策略生命周期管理**:注册、热加载、启动、暂停、停止、卸载策略
- **实时信号生成**:消费 TS 数据模块推送的行情数据,驱动策略产生交易信号
- **回测验证**:使用历史数据模拟策略执行,评估收益、回撤、夏普比率等指标
- **参数搜索**:通过网格搜索或贝叶斯优化自动寻找最优策略参数
- **下游集成**:将交易信号通过信号分发器路由到交易执行模块和风控模块
### 在系统中的位置
```
┌──────────────────────────┐
│ 策略引擎 🐍 Python │
│ │
Redis Pub/Sub ──────► │ ┌────────────────────┐ │
行情数据 │ │ 信号分发器 │──► executor/
│ └────────────────────┘ │
TimescaleDB ◄─────── │ ┌────────────────────┐ │
历史数据(回测) │ │ 策略管理器 │ │
│ │ ┌──────┬──────┐ │ │
│ │ │策略A │策略B │ │ │
│ │ └──────┴──────┘ │ │
│ └────────────────────┘ │
│ ┌────────────────────┐ │
│ │ 回测引擎 │ │
│ └────────────────────┘ │
│ ┌────────────────────┐ │
│ │ 参数优化器 │ │
│ └────────────────────┘ │
└──────────────────────────┘
```
---
## 核心架构
### 模块依赖图
```
┌──────────────────┐
│ engine/common/ │ ← 配置、日志、数据模型、策略基类
└────────┬─────────┘
┌────────────────────┼────────────────────┐
│ │ │
┌────▼────┐ ┌──────▼──────┐ ┌─────▼──────┐
│ base.py │ │ manager.py │ │ consumers/ │
│策略基类 │◄───────│ 策略管理器 │ │ 行情消费 │
└────┬────┘ └──────┬──────┘ └────────────┘
│ │
│ ┌──────▼──────┐
┌────▼────┐ │ signals.py │ ┌─────────┐
│strategies│ │ 信号分发器 │─────►│executor/│
│ 策略实现 │ └─────────────┘ └─────────┘
└────┬────┘
┌────▼────┐ ┌──────────────┐
│backtest │ │ optimizer.py │
│回测引擎 │◄───────│ 参数优化器 │
└─────────┘ └──────────────┘
```
### 设计原则
| 原则 | 说明 |
|------|------|
| **插件化** | 策略以插件形式注册,新增策略无需修改引擎核心代码 |
| **异步优先** | 全部 I/O 操作使用 `asyncio`,保证高并发下的低延迟 |
| **类型安全** | 核心接口用 `ABC` + Pydantic 模型约束,运行时校验 |
| **无状态** | 引擎本身不持有策略状态,状态由策略实例自行管理 |
| **错误隔离** | 单个策略异常不影响其他策略运行,也不影响引擎主循环 |
| **可观测** | 每个生命周期事件埋点,输出结构化日志和 Prometheus 指标 |
---
## 策略基类设计
### 抽象基类
```python
# engine/common/base.py
from abc import ABC, abstractmethod
from typing import Optional
from pydantic import BaseModel
from .models import Ticker, Kline, OrderBook, Trade
class StrategyConfig(BaseModel):
"""策略配置(由具体策略子类化扩展)"""
name: str
symbol: str
exchange: str = "binance"
enabled: bool = True
max_position_pct: float = 0.1 # 最大仓位比例
stop_loss_pct: Optional[float] = None # 止损百分比
take_profit_pct: Optional[float] = None # 止盈百分比
class Signal(BaseModel):
"""交易信号"""
symbol: str
side: str # "BUY" / "SELL"
signal_type: str # "MARKET" / "LIMIT" / "CANCEL"
price: Optional[float] = None # 限价单价格
quantity: Optional[float] = None # 下单数量
confidence: float = 1.0 # 信号置信度 [0, 1]
reason: str = "" # 信号生成原因(便于审计)
timestamp: float # 信号时间戳
class BaseStrategy(ABC):
"""所有策略的抽象基类"""
# 策略类型标识,子类必须覆盖
strategy_type: str = "base"
def __init__(self, config: StrategyConfig):
self.config = config
self.is_running = False
self.pnl = 0.0
self.trade_count = 0
# ── 生命周期钩子 ──
async def on_start(self) -> None:
"""策略启动时调用(加载初始状态、预热等)"""
self.is_running = True
async def on_stop(self) -> None:
"""策略停止时调用(清理、平仓等)"""
self.is_running = False
async def on_pause(self) -> None:
"""策略暂停时调用"""
pass
async def on_resume(self) -> None:
"""策略恢复时调用"""
pass
# ── 行情事件处理器(子类按需实现)──
@abstractmethod
async def on_kline(self, kline: Kline) -> Optional[Signal]:
"""处理 K 线数据,返回交易信号"""
...
async def on_ticker(self, ticker: Ticker) -> Optional[Signal]:
"""处理 Ticker 数据(可选实现)"""
return None
async def on_trade(self, trade: Trade) -> Optional[Signal]:
"""处理逐笔成交数据(可选实现)"""
return None
async def on_orderbook(self, orderbook: OrderBook) -> Optional[Signal]:
"""处理深度数据(可选实现)"""
return None
# ── 交易反馈钩子 ──
async def on_order_filled(self, order_result) -> None:
"""订单成交回调"""
pass
async def on_order_cancelled(self, order_id: str) -> None:
"""订单撤销回调"""
pass
# ── 辅助方法 ──
async def get_klines(
self, interval: str, limit: int = 100
) -> list[Kline]:
"""从 TimescaleDB 获取历史 K 线(回测和预热用)"""
...
def log(self, level: str, message: str, **kwargs) -> None:
"""结构化日志"""
...
```
### 策略状态机
```
register
┌────────┐ start ┌─────────┐
│ LOADED │ ──────────► │ RUNNING │
└────────┘ └────┬─────┘
▲ │
│ ┌────────┴────────┐
│ ▼ ▼
│ ┌─────────┐ ┌──────────┐
└─────────│ PAUSED │ │ STOPPING │
unload └────┬─────┘ └────┬─────┘
│ resume │
└───► RUNNING │
┌──────────┐
│ STOPPED │
└────┬─────┘
│ unload
(removed)
```
### 策略生命周期事件流
```
on_start()
┌──────────────────────────────────────────────────────────┐
│ 主循环(事件驱动) │
│ │
│ on_kline(kline) ──► Signal? ──► signal_bus.emit(signal) │
│ on_ticker(ticker) ─► Signal? ──► signal_bus.emit(signal) │
│ on_orderbook(ob) ──► Signal? ──► signal_bus.emit(signal) │
│ │
│ on_order_filled(result) ←── executor 回调 │
│ on_order_cancelled(id) ←── executor 回调 │
└──────────────────────────────────────────────────────────┘
on_stop()
```
---
## 策略管理器
策略管理器是引擎的入口组件,负责所有策略实例的注册、发现、加载和生命周期调度。
### 接口设计
```python
# engine/manager.py
import importlib
import pkgutil
from pathlib import Path
from typing import Type, Optional
from .common.logger import logger
from .common import BaseStrategy, StrategyConfig, Signal
from .signals import SignalBus
class StrategyManager:
"""
策略管理器:策略注册、热加载、生命周期调度
用法:
manager = StrategyManager(signal_bus=signal_bus)
manager.discover_strategies("strategies")
await manager.start_all()
"""
def __init__(self, signal_bus: SignalBus):
self._strategies: dict[str, BaseStrategy] = {}
self._strategy_classes: dict[str, Type[BaseStrategy]] = {}
self.signal_bus = signal_bus
# ── 策略发现与注册 ──
def discover_strategies(self, package_path: str = "strategies") -> list[str]:
"""
自动发现策略包下的所有策略类。
约定:
- 策略文件放在 `strategies/` 目录下
- 策略类必须继承 `BaseStrategy` 且在其模块中定义
返回发现的策略类型名列表。
"""
...
def register(self, strategy_cls: Type[BaseStrategy]) -> None:
"""手动注册策略类"""
...
# ── 策略实例管理 ──
def create(
self, strategy_type: str, config: StrategyConfig
) -> BaseStrategy:
"""
根据策略类型创建实例。
Raises:
ValueError: 策略类型未注册
"""
...
async def start(self, name: str) -> None:
"""启动指定策略"""
...
async def stop(self, name: str) -> None:
"""停止指定策略"""
...
async def pause(self, name: str) -> None:
"""暂停指定策略(保留状态,不接收行情)"""
...
async def resume(self, name: str) -> None:
"""恢复暂停的策略"""
...
async def restart(self, name: str, new_config: Optional[StrategyConfig] = None) -> BaseStrategy:
"""重启策略(可选地更新配置)"""
...
def get(self, name: str) -> Optional[BaseStrategy]:
"""获取策略实例"""
...
def list_strategies(self) -> list[dict]:
"""列出所有策略的状态"""
...
# ── 批量操作 ──
async def start_all(self) -> None:
"""启动所有 enabled 策略"""
...
async def stop_all(self) -> None:
"""停止所有策略"""
...
# ── 行情入口(由主循环调用)──
async def feed_kline(self, kline) -> None:
"""将 K 线数据分发到对应 symbol 的策略"""
...
async def feed_ticker(self, ticker) -> None:
"""将 Ticker 分发到对应 symbol 的策略"""
...
async def feed_orderbook(self, orderbook) -> None:
"""将 OrderBook 分发到对应 symbol 的策略"""
...
# ── 交易反馈入口 ──
async def notify_order_filled(self, strategy_name: str, order_result) -> None:
"""通知策略订单已成交"""
...
# ── 统计与监控 ──
def stats(self) -> dict:
"""返回引擎统计信息"""
...
```
### 策略热加载机制
```
策略文件变更
┌──────────────────┐
│ 文件监听器 │
│ (watchdog) │
└────────┬─────────┘
┌──────────▼──────────┐
│ 重新 importlib.reload│
└──────────┬──────────┘
┌──────────▼──────────┐
│ 更新 _strategy_classes│
└──────────┬──────────┘
┌──────────▼──────────┐
│ 重新创建策略实例 │
│ (stop → create → start)│
└─────────────────────┘
```
---
## 信号分发器
信号分发器是策略到交易执行模块之间的**解耦层**,提供事件总线能力。
### 设计目标
- **解耦**:策略不直接调用交易 API,只 emit 信号
- **过滤**:同一 symbol 的冲突信号自动去重/合并
- **优先级**:支持信号优先级排序(高置信度优先)
- **限流**:防止策略过度交易,内置冷却时间
- **审计**:所有信号记录到数据库,可追溯
### 接口设计
```python
# engine/signals.py
import asyncio
from collections import defaultdict
from typing import Callable, Awaitable
from .common import Signal
SignalHandler = Callable[[Signal], Awaitable[None]]
class SignalBus:
"""信号事件总线"""
def __init__(self):
self._handlers: dict[str, list[SignalHandler]] = defaultdict(list)
self._signal_history: list[Signal] = []
self._cooldowns: dict[str, float] = {} # symbol -> 下次允许信号时间
def subscribe(self, event_type: str, handler: SignalHandler) -> None:
"""
订阅信号类型。
event_type: "order" / "alert" / "log" 或自定义
handler: async callable,接收 Signal 参数
"""
...
async def emit(self, signal: Signal) -> None:
"""
发布信号。会经过以下处理链:
1. 冷却检查
2. 冲突检测
3. 优先级排序
4. 分发给所有订阅者
5. 记录到审计日志
"""
...
def set_cooldown(self, symbol: str, seconds: float) -> None:
"""设置 symbol 的下单冷却时间"""
...
def recent_signals(self, symbol: str, n: int = 10) -> list[Signal]:
"""获取最近的信号历史"""
...
class SignalFilter:
"""信号过滤器基类(责任链模式)"""
async def filter(self, signal: Signal) -> Signal | None:
"""返回过滤后的信号,返回 None 表示丢弃"""
return signal
class CooldownFilter(SignalFilter):
"""冷却时间过滤器"""
...
class DuplicateFilter(SignalFilter):
"""重复信号过滤器(同一 symbol 短期内相同方向合并)"""
...
class ConfidenceFilter(SignalFilter):
"""置信度过滤器(低于阈值的信号丢弃)"""
...
```
### 信号流向
```
策略A.on_kline() ──► Signal ──┐
策略B.on_ticker() ─► Signal ──┤
策略C.on_kline() ──► Signal ──┤
┌────────────────┐
│ SignalBus │
│ │
│ ┌──────────┐ │
│ │ 冷却检查 │ │
│ └────┬─────┘ │
│ ┌────▼─────┐ │
│ │ 去重合并 │ │
│ └────┬─────┘ │
│ ┌────▼─────┐ │
│ │ 优先级排序│ │
│ └────┬─────┘ │
└───────┼────────┘
┌─────────────┼─────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ executor │ │ risk/ │ │ monitor/ │
│ 交易执行 │ │ 风控 │ │ 日志 │
└──────────┘ └──────────┘ └──────────┘
```
---
## 回测引擎
回测引擎使用历史 K 线数据模拟策略执行,评估策略在历史行情中的表现。
### 设计目标
- **事件驱动**:模拟真实行情推送,按时间顺序逐根 K 线喂给策略
- **精确模拟**:考虑手续费、滑点、最小下单量等实际交易约束
- **多维度指标**:总收益率、年化收益、夏普比率、最大回撤、胜率、盈亏比
- **可视化输出**:资金曲线图、回撤曲线、交易点标注
### 架构
```
┌──────────────────────────┐
│ 回测引擎 │
│ │
历史K线数据 ─────►│ ┌────────────────────┐ │
(TimescaleDB) │ │ BacktestEngine │ │
│ │ │ │
策略配置 ────────►│ │ - 加载历史K线 │ │
│ │ - 逐根推送给策略 │ │
策略类 ──────────►│ │ - 模拟订单成交 │ │
│ │ - 计算性能指标 │ │
│ │ - 生成回测报告 │ │
│ └────────────────────┘ │
└──────────┬───────────────┘
┌──────────────────────────┐
│ 回测结果 │
│ - equity_curve │
│ - trades │
│ - metrics │
│ - report (HTML/JSON) │
└──────────────────────────┘
```
### 接口设计
```python
# engine/backtest.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Type
from pydantic import BaseModel
from .common import BaseStrategy, StrategyConfig
@dataclass
class BacktestConfig:
"""回测配置"""
symbol: str
exchange: str = "binance"
interval: str = "1h"
start_time: datetime
end_time: datetime
# 交易成本
commission_pct: float = 0.001 # 手续费率 (0.1%)
slippage_pct: float = 0.0005 # 滑点 (0.05%)
min_order_qty: float = 0.001 # 最小下单量
# 资金
initial_capital: float = 10_000.0 # 初始资金
# 数据
warmup_bars: int = 100 # 预热 K 线条数
@dataclass
class BacktestTrade:
"""单笔回测交易记录"""
timestamp: datetime
symbol: str
side: str
price: float
quantity: float
commission: float
slippage: float
pnl: Optional[float] = None
reason: str = ""
@dataclass
class BacktestMetrics:
"""回测性能指标"""
total_return_pct: float # 总收益率
annual_return_pct: float # 年化收益率
sharpe_ratio: float # 夏普比率
sortino_ratio: float # 索提诺比率
max_drawdown_pct: float # 最大回撤
max_drawdown_duration: int # 最大回撤持续天数
win_rate: float # 胜率
profit_factor: float # 盈亏比
total_trades: int # 总交易次数
avg_holding_hours: float # 平均持仓时间(小时)
calmar_ratio: float # 卡尔玛比率
@dataclass
class BacktestResult:
"""完整回测结果"""
config: BacktestConfig
strategy_config: StrategyConfig
metrics: BacktestMetrics
trades: list[BacktestTrade]
equity_curve: list[dict] # [{timestamp, equity, drawdown}, ...]
class BacktestEngine:
"""回测引擎"""
def __init__(self, config: BacktestConfig):
self.config = config
self._klines: list = []
self._trades: list[BacktestTrade] = []
self._equity: list[dict] = []
async def load_data(self) -> None:
"""从 TimescaleDB 加载历史 K 线数据"""
...
async def run(
self,
strategy_cls: Type[BaseStrategy],
strategy_config: StrategyConfig,
) -> BacktestResult:
"""
执行回测。
流程:
1. 预热:先喂 warmup_bars 根 K 线但不产生交易
2. 回测循环:逐根 K 线推送给策略
3. 收到 Signal 时模拟订单成交
4. 每根 K 线更新资金曲线
5. 计算最终指标
"""
...
async def run_batch(
self,
strategy_cls: Type[BaseStrategy],
configs: list[StrategyConfig],
max_concurrency: int = 4,
) -> list[BacktestResult]:
"""批量回测(并行执行,用于参数扫描)"""
...
@staticmethod
def compute_metrics(
equity_curve: list[dict],
trades: list[BacktestTrade],
config: BacktestConfig,
) -> BacktestMetrics:
"""从资金曲线和交易记录计算性能指标"""
...
def generate_report(self, result: BacktestResult, format: str = "json") -> str:
"""生成回测报告(json/html/markdown"""
...
def plot(self, result: BacktestResult) -> None:
"""绘制资金曲线和回撤图(matplotlib)"""
...
```
### 回测执行流程
```
开始
┌─────────────────────┐
│ 加载历史 K 线数据 │
│ (TimescaleDB) │
└──────────┬──────────┘
┌─────────────────────┐
│ 预热阶段 │
│ 喂入 warmup_bars │
│ 策略初始化指标 │
└──────────┬──────────┘
┌─────────────────────┐
│ 回测主循环 │
│ │
│ for kline in data: │
│ signal = strat. │
│ on_kline(kline)│
│ if signal: │
│ simulate_trade │
│ update_equity() │
│ │
└──────────┬──────────┘
┌─────────────────────┐
│ 计算性能指标 │
│ - 收益率 │
│ - 夏普比率 │
│ - 最大回撤 │
│ - 胜率等 │
└──────────┬──────────┘
┌─────────────────────┐
│ 生成回测报告 │
│ - JSON 结果 │
│ - 资金曲线图 │
│ - HTML 报告 │
└─────────────────────┘
```
### 交易模拟逻辑
```
收到 Signal
┌──────────────┐
│ 输入价格 = │
│ kline.close │
│ + slippage │
└──────┬───────┘
┌──────────────┐
│ 检查余额 │──── 不足 ──► 跳过,记录日志
│ 检查最小下单量 │
└──────┬───────┘
│ 通过
┌──────────────┐
│ 执行成交 │
│ - 更新持仓 │
│ - 扣除手续费 │
│ - 记录交易 │
│ - 计算盈亏 │
└──────────────┘
```
---
## 参数优化器
参数优化器基于回测引擎,自动搜索最优策略参数组合。
### 支持的优化策略
| 方法 | 适用场景 | 特点 |
|------|---------|------|
| **网格搜索** | 参数空间小(2-3 维) | 穷举所有组合,结果完备但慢 |
| **随机搜索** | 参数空间中等 | 比网格搜索更高效 |
| **贝叶斯优化** | 参数空间大 | 利用先验知识,收敛快 |
| **遗传算法** | 复杂非线性参数空间 | 全局搜索能力强 |
### 接口设计
```python
# engine/optimizer.py
from dataclasses import dataclass, field
from typing import Callable, Any
from backtest import BacktestEngine, BacktestConfig, BacktestResult
from common import BaseStrategy, StrategyConfig
@dataclass
class ParamSpec:
"""参数规格定义"""
name: str
type: str # "int" / "float" / "choice"
low: Optional[float] = None # 数值参数下界
high: Optional[float] = None # 数值参数上界
step: Optional[float] = None # 步长(网格搜索用)
choices: Optional[list[Any]] = None # 离散选择
log_scale: bool = False # 是否对数尺度搜索
@dataclass
class OptimizationConfig:
"""优化器配置"""
method: str = "bayesian" # "grid" / "random" / "bayesian" / "genetic"
objective: str = "sharpe_ratio" # 优化目标指标
max_iterations: int = 100 # 最大迭代次数
n_jobs: int = 4 # 并行任务数
early_stopping_rounds: int = 10 # 早停轮数
cv_folds: int = 3 # 交叉验证折数
@dataclass
class OptimizationResult:
"""单次优化结果"""
params: dict[str, Any] # 最优参数
score: float # 目标函数值
all_trials: list[dict] # 所有试验记录
best_result: BacktestResult # 最优参数的回测结果
class ParamOptimizer:
"""参数优化器"""
def __init__(
self,
backtest_config: BacktestConfig,
strategy_cls: Type[BaseStrategy],
base_config: StrategyConfig,
param_specs: list[ParamSpec],
opt_config: OptimizationConfig = OptimizationConfig(),
):
...
async def optimize(self) -> OptimizationResult:
"""
执行参数优化。
返回最优参数组合及对应的回测结果。
"""
...
def suggest_params(self, n: int = 5) -> list[dict]:
"""获取 top-n 参数组合建议"""
...
def plot_importance(self) -> None:
"""绘制参数重要性图"""
...
def plot_parallel_coordinates(self) -> None:
"""绘制参数平行坐标图"""
...
```
### 优化流程
```
┌────────────────┐
│ 定义参数空间 │
│ ParamSpec[] │
└───────┬────────┘
┌────────────────┐
│ 选择优化算法 │
│ (网格/贝叶斯等) │
└───────┬────────┘
┌────────▼────────┐
│ 优化主循环 │
│ │
│ while n < max: │
│ params = │
│ suggest() │
│ result = │
│ backtest() │
│ score = │
│ metric() │
│ update() │
│ check_early()│
│ │
└────────┬────────┘
┌────────────────┐
│ 返回最优结果 │
│ + 所有试验记录 │
└────────────────┘
```
### 目标函数
```
目标函数可选指标(最小化回撤时取负值):
sharpe_ratio — 夏普比率(风险调整后收益,推荐)
sortino_ratio — 索提诺比率(只考虑下行波动)
total_return — 总收益率
calmar_ratio — 卡尔玛比率(收益/最大回撤)
profit_factor — 盈亏比
custom — 自定义组合指标
```
---
## 数据流与生命周期
### 实时交易数据流
```
Redis Pub/Sub engine/ executor/
───────────── ───────── ─────────
market:kline:* ──► RedisConsumer ──► Manager.feed_kline()
路由到对应 symbol 策略
┌─────┴─────┐
▼ ▼
策略A.on_kline 策略B.on_kline
│ │
Signal? Signal?
│ │
└─────┬─────┘
SignalBus.emit()
┌───────────┼───────────┐
▼ ▼ ▼
冷却检查 去重合并 优先级排序
│ │ │
└───────────┼───────────┘
┌──────────┐
│ risk/ │ ← 风控检查
│ manager │
└────┬─────┘
│ passed
┌──────────┐
│ executor │ ← 实际下单
└──────────┘
订单状态回调
Manager.notify_order_filled()
策略.on_order_filled()
```
### 回测数据流
```
TimescaleDB ──► BacktestEngine.load_data()
历史 K 线列表
BacktestEngine.run()
逐根推送 K 线 ──► 策略.on_kline()
│ │
│ Signal?
│ │
▼ ▼
更新资金曲线 simulate_trade()
计算指标 ──► BacktestResult
```
### 引擎主循环
```python
# engine/main.py — 主程序入口示意
import asyncio
from config import settings
from common.storage import TimescaleDBReader
from manager import StrategyManager
from signals import SignalBus
async def main():
# 1. 初始化信号总线
signal_bus = SignalBus()
# 2. 初始化策略管理器
manager = StrategyManager(signal_bus)
manager.discover_strategies("strategies")
# 3. 创建并注册策略实例
for cfg in settings.strategies:
manager.create(cfg.type, cfg)
# 4. 启动所有策略
await manager.start_all()
# 5. 订阅 Redis 行情
consumer = RedisMarketConsumer(settings.redis_url)
await consumer.subscribe_kline(manager.feed_kline)
await consumer.subscribe_ticker(manager.feed_ticker)
# 6. 保持运行
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())
```
---
## 与上下游模块的交互
### 上游:TypeScript 数据模块
引擎通过 Redis Pub/Sub 消费 TS 侧推送的行情数据:
| Redis 频道 | 数据类型 | 对应策略方法 |
|-----------|---------|-------------|
| `market:ticker:{exchange}:{symbol}` | Ticker | `on_ticker()` |
| `market:kline:{exchange}:{symbol}:{interval}` | Kline | `on_kline()` |
| `market:trade:{exchange}:{symbol}` | Trade | `on_trade()` |
| `market:orderbook:{exchange}:{symbol}` | OrderBook | `on_orderbook()` |
数据格式使用 **MessagePack** 序列化,Python 侧用 `msgpack.unpackb()` 反序列化为 Pydantic 模型。
### 下游:交易执行模块
引擎通过 `SignalBus` 发布信号,executor 订阅:
```python
# executor 侧订阅
signal_bus.subscribe("order", executor.handle_signal)
```
### 下游:风控模块
风控模块也订阅信号总线,在 executor 执行前进行风控检查:
```python
# risk 侧订阅
signal_bus.subscribe("order", risk_manager.check_signal)
```
### 下游:监控模块
监控模块订阅所有信号用于记录和统计:
```python
# monitor 侧订阅
signal_bus.subscribe("*", monitor.record_signal)
```
---
## 目录结构
```
engine/
├── ENGINE.md # 本文档
├── __init__.py
├── env.yaml # 引擎环境配置
├── common/ # 通用模块(基础数据类型、策略基类、日志)
│ ├── __init__.py
│ ├── base.py # BaseStrategy 抽象基类 + Signal / StrategyConfig
│ ├── models.py # 数据模型:Ticker / Kline / Trade / OrderBook
│ └── logger.py # 结构化日志模块
├── manager.py # StrategyManager 策略管理器
├── signals.py # SignalBus 信号分发器 + SignalFilter 过滤器
├── backtest.py # BacktestEngine 回测引擎
├── optimizer.py # ParamOptimizer 参数优化器
├── main.py # 引擎主循环入口
├── indicators/ # 技术指标计算(基于 TA-Lib / pandas_ta
│ ├── __init__.py
│ ├── trend.py # 趋势指标 (MA, EMA, MACD, ADX...)
│ ├── momentum.py # 动量指标 (RSI, Stochastic, CCI...)
│ ├── volatility.py # 波动率指标 (Bollinger, ATR, Keltner...)
│ ├── volume.py # 成交量指标 (OBV, VWAP, MFI...)
│ └── composite.py # 复合指标(自定义组合)
├── consumers/ # 行情消费器(Redis → 策略馈送)
│ ├── __init__.py
│ ├── redis.py # Redis Pub/Sub 消费者
│ └── nats.py # NATS 消费者(可选)
└── tests/ # 引擎测试
├── __init__.py
├── test_base.py
├── test_manager.py
├── test_signals.py
├── test_backtest.py
├── test_optimizer.py
└── fixtures/ # 测试夹具(模拟行情数据)
└── sample_klines.json
```
---
## 开发步骤
### 第一阶段:核心框架(第 1 周)
| 步骤 | 任务 | 产出 |
|------|------|------|
| 1.1 | 实现 `common/base.py``BaseStrategy``Signal``StrategyConfig` | 策略基类和类型定义 |
| 1.2 | 实现 `common/models.py``Ticker``Kline``Trade``OrderBook` Pydantic 模型 | 与 TS 对齐的数据模型 |
| 1.3 | 实现 `common/logger.py` — 结构化日志模块 | 统一日志输出 |
| 1.3 | 实现 `common/logger.py` — 结构化日志模块 | 统一日志输出 |
| 1.4 | 实现 `signals.py``SignalBus``CooldownFilter``DuplicateFilter` | 事件总线和过滤链 |
| 1.5 | 实现 `manager.py` — 策略注册、创建、启停 | 策略管理器 |
| 1.6 | 实现 `consumers/redis.py` — Redis Pub/Sub 消费者 | 实时行情接收 |
| 1.7 | 编写 `main.py` — 引擎主循环串联 | 端到端可运行 |
### 第二阶段:指标与策略示例(第 2 周)
| 步骤 | 任务 | 产出 |
|------|------|------|
| 2.1 | 实现 `indicators/` — MA、EMA、MACD、RSI、Bollinger | 技术指标库 |
| 2.2 | 编写示例策略 1:双均线交叉 | `strategies/ma_cross.py` |
| 2.3 | 编写示例策略 2:网格交易 | `strategies/grid_trading.py` |
| 2.4 | 集成测试:Redis 行情 → 策略 → 信号 | 端到端验证 |
### 第三阶段:回测(第 3 周)
| 步骤 | 任务 | 产出 |
|------|------|------|
| 3.1 | 实现 `backtest.py` — 回测引擎核心 | 事件驱动回测 |
| 3.2 | 实现交易模拟逻辑(手续费、滑点) | 精确回测 |
| 3.3 | 实现性能指标计算(夏普比率、最大回撤等) | 完整指标 |
| 3.4 | 实现回测报告生成(JSON/HTML) | 可视化报告 |
| 3.5 | 编写回测测试用例 | 回测验证 |
### 第四阶段:参数优化(第 4 周)
| 步骤 | 任务 | 产出 |
|------|------|------|
| 4.1 | 实现 `optimizer.py` — 网格搜索 | 穷举搜索 |
| 4.2 | 集成 Optuna 实现贝叶斯优化 | 智能搜索 |
| 4.3 | 实现交叉验证 | 防止过拟合 |
| 4.4 | 实现参数重要性分析 | 参数洞察 |
---
## 实现规范
### 异步编程规范
- 所有 I/O 操作使用 `async/await`
- 长时间计算任务使用 `asyncio.to_thread()` 放入线程池
- 避免在协程中阻塞(不用 `time.sleep`,用 `asyncio.sleep`
- 策略的 `on_*` 方法必须是非阻塞的快速返回
### 错误处理
```python
# 策略错误隔离模式
async def feed_kline(self, kline) -> None:
for name, strategy in self._strategies.items():
if not strategy.is_running:
continue
if strategy.config.symbol != kline.symbol:
continue
try:
signal = await strategy.on_kline(kline)
if signal:
await self.signal_bus.emit(signal)
except Exception as e:
logger.error(f"Strategy {name} error: {e}", exc_info=True)
# 单策略异常不影响其他策略
```
### 日志规范
```python
from common.logger import logger
# 使用结构化日志
logger.info("strategy_signal", strategy="ma_cross", symbol="BTCUSDT",
side="BUY", confidence=0.85, timestamp=1234567890)
logger.warning("strategy_error", strategy="grid", error=str(e))
logger.info("backtest_complete", symbol="ETHUSDT", sharpe=1.45, trades=230)
```
### 类型规范
- 所有公开接口使用类型注解
- 数据模型使用 Pydantic `BaseModel`
- 策略配置继承 `StrategyConfig` 并扩展
- 信号必须通过 `Signal` 模型创建,禁止使用裸 dict
### 测试规范
- 核心逻辑(SignalBus、StrategyManager)单元测试覆盖率 > 90%
- 策略逻辑有独立单元测试
- 回测引擎用已知结果的静态数据集验证
- 使用 `pytest-asyncio` 支持异步测试
### 性能基准
| 指标 | 目标值 | 说明 |
|------|--------|------|
| 策略单次 `on_kline` 耗时 | < 1ms | 不含 I/O 的纯计算 |
| SignalBus 发送延迟 | < 0.1ms | emit 到 handler 开始执行 |
| 回测速度 | > 10000 bars/s | 10 万根 K 线 < 10 秒 |
| 100 策略并行 | CPU < 50% | 空策略情况下的资源占用 |
---
## 附录
### A. 示例策略实现(双均线交叉)
```python
# strategies/ma_cross.py
from typing import Optional
from common import BaseStrategy, StrategyConfig, Signal
class MACrossConfig(StrategyConfig):
fast_period: int = 7
slow_period: int = 25
class MACrossStrategy(BaseStrategy):
strategy_type = "ma_cross"
def __init__(self, config: MACrossConfig):
super().__init__(config)
self.config: MACrossConfig = config
self._prices: list[float] = []
self._last_signal: Optional[str] = None # "BUY" / "SELL"
def _sma(self, data: list[float], period: int) -> float:
if len(data) < period:
return 0.0
return sum(data[-period:]) / period
async def on_kline(self, kline) -> Optional[Signal]:
self._prices.append(kline.close)
fast = self._sma(self._prices, self.config.fast_period)
slow = self._sma(self._prices, self.config.slow_period)
if fast == 0 or slow == 0:
return None
if fast > slow and self._last_signal != "BUY":
self._last_signal = "BUY"
return Signal(
symbol=self.config.symbol,
side="BUY",
signal_type="MARKET",
confidence=0.7,
reason=f"Golden cross: MA{self.config.fast_period} > MA{self.config.slow_period}",
timestamp=kline.timestamp,
)
if fast < slow and self._last_signal != "SELL":
self._last_signal = "SELL"
return Signal(
symbol=self.config.symbol,
side="SELL",
signal_type="MARKET",
confidence=0.7,
reason=f"Death cross: MA{self.config.fast_period} < MA{self.config.slow_period}",
timestamp=kline.timestamp,
)
return None
```
### B. 引擎配置示例(engine/env.yaml
```yaml
# engine/env.yaml
engine:
name: "trade-engine"
strategies_package: "strategies"
max_concurrent_strategies: 50
redis:
url: "redis://localhost:6379/0"
channels:
kline: "market:kline:*"
ticker: "market:ticker:*"
orderbook: "market:orderbook:*"
signals:
cooldown_seconds: 1.0 # 同 symbol 下单冷却时间
min_confidence: 0.3 # 最低信号置信度
max_signals_per_minute: 60 # 每分钟最大信号数
backtest:
default_commission: 0.001
default_slippage: 0.0005
default_capital: 10000.0
optimizer:
default_method: "bayesian"
default_objective: "sharpe_ratio"
max_iterations: 200
n_jobs: 4
early_stopping: 20
```
### C. 相关文档索引
| 文档 | 路径 |
|------|------|
| 系统总体架构 | `../README.md` |
| 数据模块开发指南 | `../data/ENGINE.md` (待创建) |
| 交易执行模块 | `../executor/ENGINE.md` (待创建) |
| 风控模块 | `../risk/ENGINE.md` (待创建) |
| 公共模块 | `./common/__init__.py` |