# 策略引擎开发文档 > 数字货币量化交易系统 — 策略引擎模块(Python 3.10+)设计与开发指南 --- ## 目录 1. [概述与定位](#概述与定位) 2. [核心架构](#核心架构) 3. [策略基类设计](#策略基类设计) 4. [策略管理器](#策略管理器) 5. [信号分发器](#信号分发器) 6. [回测引擎](#回测引擎) 7. [参数优化器](#参数优化器) 8. [数据流与生命周期](#数据流与生命周期) 9. [与上下游模块的交互](#与上下游模块的交互) 10. [目录结构](#目录结构) 11. [开发步骤](#开发步骤) 12. [实现规范](#实现规范) --- ## 概述与定位 策略引擎是整个量化交易系统的**核心调度中心**,位于 Python 业务层,负责: - **策略生命周期管理**:注册、热加载、启动、暂停、停止、卸载策略 - **实时信号生成**:消费 TS 数据模块推送的行情数据,驱动策略产生交易信号 - **回测验证**:使用历史数据模拟策略执行,评估收益、回撤、夏普比率等指标 - **参数搜索**:通过网格搜索或贝叶斯优化自动寻找最优策略参数 - **下游集成**:将交易信号通过信号分发器路由到交易执行模块和风控模块 ### 在系统中的位置 ``` ┌──────────────────────────┐ │ 策略引擎 🐍 Python │ │ │ Redis Pub/Sub ──────► │ ┌────────────────────┐ │ 行情数据 │ │ 信号分发器 │──► executor/ │ └────────────────────┘ │ TimescaleDB ◄─────── │ ┌────────────────────┐ │ 历史数据(回测) │ │ 策略管理器 │ │ │ │ ┌──────┬──────┐ │ │ │ │ │策略A │策略B │ │ │ │ │ └──────┴──────┘ │ │ │ └────────────────────┘ │ │ ┌────────────────────┐ │ │ │ 回测引擎 │ │ │ └────────────────────┘ │ │ ┌────────────────────┐ │ │ │ 参数优化器 │ │ │ └────────────────────┘ │ └──────────────────────────┘ ``` --- ## 核心架构 ### 模块依赖图 ``` ┌──────────────────┐ │ engine/common/ │ ← 配置、日志、数据模型、策略基类 └────────┬─────────┘ │ ┌────────────────────┼────────────────────┐ │ │ │ ┌────▼────┐ ┌──────▼──────┐ ┌─────▼──────┐ │ base.py │ │ manager.py │ │ consumers/ │ │策略基类 │◄───────│ 策略管理器 │ │ 行情消费 │ └────┬────┘ └──────┬──────┘ └────────────┘ │ │ │ ┌──────▼──────┐ ┌────▼────┐ │ signals.py │ ┌─────────┐ │strategies│ │ 信号分发器 │─────►│executor/│ │ 策略实现 │ └─────────────┘ └─────────┘ └────┬────┘ │ ┌────▼────┐ ┌──────────────┐ │backtest │ │ optimizer.py │ │回测引擎 │◄───────│ 参数优化器 │ └─────────┘ └──────────────┘ ``` ### 设计原则 | 原则 | 说明 | |------|------| | **插件化** | 策略以插件形式注册,新增策略无需修改引擎核心代码 | | **异步优先** | 全部 I/O 操作使用 `asyncio`,保证高并发下的低延迟 | | **类型安全** | 核心接口用 `ABC` + Pydantic 模型约束,运行时校验 | | **无状态** | 引擎本身不持有策略状态,状态由策略实例自行管理 | | **错误隔离** | 单个策略异常不影响其他策略运行,也不影响引擎主循环 | | **可观测** | 每个生命周期事件埋点,输出结构化日志和 Prometheus 指标 | --- ## 策略基类设计 ### 抽象基类 ```python # engine/common/base.py from abc import ABC, abstractmethod from typing import Optional from pydantic import BaseModel from .models import Ticker, Kline, OrderBook, Trade class StrategyConfig(BaseModel): """策略配置(由具体策略子类化扩展)""" name: str symbol: str exchange: str = "binance" enabled: bool = True max_position_pct: float = 0.1 # 最大仓位比例 stop_loss_pct: Optional[float] = None # 止损百分比 take_profit_pct: Optional[float] = None # 止盈百分比 class Signal(BaseModel): """交易信号""" symbol: str side: str # "BUY" / "SELL" signal_type: str # "MARKET" / "LIMIT" / "CANCEL" price: Optional[float] = None # 限价单价格 quantity: Optional[float] = None # 下单数量 confidence: float = 1.0 # 信号置信度 [0, 1] reason: str = "" # 信号生成原因(便于审计) timestamp: float # 信号时间戳 class BaseStrategy(ABC): """所有策略的抽象基类""" # 策略类型标识,子类必须覆盖 strategy_type: str = "base" def __init__(self, config: StrategyConfig): self.config = config self.is_running = False self.pnl = 0.0 self.trade_count = 0 # ── 生命周期钩子 ── async def on_start(self) -> None: """策略启动时调用(加载初始状态、预热等)""" self.is_running = True async def on_stop(self) -> None: """策略停止时调用(清理、平仓等)""" self.is_running = False async def on_pause(self) -> None: """策略暂停时调用""" pass async def on_resume(self) -> None: """策略恢复时调用""" pass # ── 行情事件处理器(子类按需实现)── @abstractmethod async def on_kline(self, kline: Kline) -> Optional[Signal]: """处理 K 线数据,返回交易信号""" ... async def on_ticker(self, ticker: Ticker) -> Optional[Signal]: """处理 Ticker 数据(可选实现)""" return None async def on_trade(self, trade: Trade) -> Optional[Signal]: """处理逐笔成交数据(可选实现)""" return None async def on_orderbook(self, orderbook: OrderBook) -> Optional[Signal]: """处理深度数据(可选实现)""" return None # ── 交易反馈钩子 ── async def on_order_filled(self, order_result) -> None: """订单成交回调""" pass async def on_order_cancelled(self, order_id: str) -> None: """订单撤销回调""" pass # ── 辅助方法 ── async def get_klines( self, interval: str, limit: int = 100 ) -> list[Kline]: """从 TimescaleDB 获取历史 K 线(回测和预热用)""" ... def log(self, level: str, message: str, **kwargs) -> None: """结构化日志""" ... ``` ### 策略状态机 ``` register │ ▼ ┌────────┐ start ┌─────────┐ │ LOADED │ ──────────► │ RUNNING │ └────────┘ └────┬─────┘ ▲ │ │ ┌────────┴────────┐ │ ▼ ▼ │ ┌─────────┐ ┌──────────┐ └─────────│ PAUSED │ │ STOPPING │ unload └────┬─────┘ └────┬─────┘ │ resume │ └───► RUNNING │ ▼ ┌──────────┐ │ STOPPED │ └────┬─────┘ │ unload ▼ (removed) ``` ### 策略生命周期事件流 ``` on_start() │ ▼ ┌──────────────────────────────────────────────────────────┐ │ 主循环(事件驱动) │ │ │ │ on_kline(kline) ──► Signal? ──► signal_bus.emit(signal) │ │ on_ticker(ticker) ─► Signal? ──► signal_bus.emit(signal) │ │ on_orderbook(ob) ──► Signal? ──► signal_bus.emit(signal) │ │ │ │ on_order_filled(result) ←── executor 回调 │ │ on_order_cancelled(id) ←── executor 回调 │ └──────────────────────────────────────────────────────────┘ │ ▼ on_stop() ``` --- ## 策略管理器 策略管理器是引擎的入口组件,负责所有策略实例的注册、发现、加载和生命周期调度。 ### 接口设计 ```python # engine/manager.py import importlib import pkgutil from pathlib import Path from typing import Type, Optional from .common.logger import logger from .common import BaseStrategy, StrategyConfig, Signal from .signals import SignalBus class StrategyManager: """ 策略管理器:策略注册、热加载、生命周期调度 用法: manager = StrategyManager(signal_bus=signal_bus) manager.discover_strategies("strategies") await manager.start_all() """ def __init__(self, signal_bus: SignalBus): self._strategies: dict[str, BaseStrategy] = {} self._strategy_classes: dict[str, Type[BaseStrategy]] = {} self.signal_bus = signal_bus # ── 策略发现与注册 ── def discover_strategies(self, package_path: str = "strategies") -> list[str]: """ 自动发现策略包下的所有策略类。 约定: - 策略文件放在 `strategies/` 目录下 - 策略类必须继承 `BaseStrategy` 且在其模块中定义 返回发现的策略类型名列表。 """ ... def register(self, strategy_cls: Type[BaseStrategy]) -> None: """手动注册策略类""" ... # ── 策略实例管理 ── def create( self, strategy_type: str, config: StrategyConfig ) -> BaseStrategy: """ 根据策略类型创建实例。 Raises: ValueError: 策略类型未注册 """ ... async def start(self, name: str) -> None: """启动指定策略""" ... async def stop(self, name: str) -> None: """停止指定策略""" ... async def pause(self, name: str) -> None: """暂停指定策略(保留状态,不接收行情)""" ... async def resume(self, name: str) -> None: """恢复暂停的策略""" ... async def restart(self, name: str, new_config: Optional[StrategyConfig] = None) -> BaseStrategy: """重启策略(可选地更新配置)""" ... def get(self, name: str) -> Optional[BaseStrategy]: """获取策略实例""" ... def list_strategies(self) -> list[dict]: """列出所有策略的状态""" ... # ── 批量操作 ── async def start_all(self) -> None: """启动所有 enabled 策略""" ... async def stop_all(self) -> None: """停止所有策略""" ... # ── 行情入口(由主循环调用)── async def feed_kline(self, kline) -> None: """将 K 线数据分发到对应 symbol 的策略""" ... async def feed_ticker(self, ticker) -> None: """将 Ticker 分发到对应 symbol 的策略""" ... async def feed_orderbook(self, orderbook) -> None: """将 OrderBook 分发到对应 symbol 的策略""" ... # ── 交易反馈入口 ── async def notify_order_filled(self, strategy_name: str, order_result) -> None: """通知策略订单已成交""" ... # ── 统计与监控 ── def stats(self) -> dict: """返回引擎统计信息""" ... ``` ### 策略热加载机制 ``` 策略文件变更 │ ▼ ┌──────────────────┐ │ 文件监听器 │ │ (watchdog) │ └────────┬─────────┘ │ ┌──────────▼──────────┐ │ 重新 importlib.reload│ └──────────┬──────────┘ │ ┌──────────▼──────────┐ │ 更新 _strategy_classes│ └──────────┬──────────┘ │ ┌──────────▼──────────┐ │ 重新创建策略实例 │ │ (stop → create → start)│ └─────────────────────┘ ``` --- ## 信号分发器 信号分发器是策略到交易执行模块之间的**解耦层**,提供事件总线能力。 ### 设计目标 - **解耦**:策略不直接调用交易 API,只 emit 信号 - **过滤**:同一 symbol 的冲突信号自动去重/合并 - **优先级**:支持信号优先级排序(高置信度优先) - **限流**:防止策略过度交易,内置冷却时间 - **审计**:所有信号记录到数据库,可追溯 ### 接口设计 ```python # engine/signals.py import asyncio from collections import defaultdict from typing import Callable, Awaitable from .common import Signal SignalHandler = Callable[[Signal], Awaitable[None]] class SignalBus: """信号事件总线""" def __init__(self): self._handlers: dict[str, list[SignalHandler]] = defaultdict(list) self._signal_history: list[Signal] = [] self._cooldowns: dict[str, float] = {} # symbol -> 下次允许信号时间 def subscribe(self, event_type: str, handler: SignalHandler) -> None: """ 订阅信号类型。 event_type: "order" / "alert" / "log" 或自定义 handler: async callable,接收 Signal 参数 """ ... async def emit(self, signal: Signal) -> None: """ 发布信号。会经过以下处理链: 1. 冷却检查 2. 冲突检测 3. 优先级排序 4. 分发给所有订阅者 5. 记录到审计日志 """ ... def set_cooldown(self, symbol: str, seconds: float) -> None: """设置 symbol 的下单冷却时间""" ... def recent_signals(self, symbol: str, n: int = 10) -> list[Signal]: """获取最近的信号历史""" ... class SignalFilter: """信号过滤器基类(责任链模式)""" async def filter(self, signal: Signal) -> Signal | None: """返回过滤后的信号,返回 None 表示丢弃""" return signal class CooldownFilter(SignalFilter): """冷却时间过滤器""" ... class DuplicateFilter(SignalFilter): """重复信号过滤器(同一 symbol 短期内相同方向合并)""" ... class ConfidenceFilter(SignalFilter): """置信度过滤器(低于阈值的信号丢弃)""" ... ``` ### 信号流向 ``` 策略A.on_kline() ──► Signal ──┐ 策略B.on_ticker() ─► Signal ──┤ 策略C.on_kline() ──► Signal ──┤ ▼ ┌────────────────┐ │ SignalBus │ │ │ │ ┌──────────┐ │ │ │ 冷却检查 │ │ │ └────┬─────┘ │ │ ┌────▼─────┐ │ │ │ 去重合并 │ │ │ └────┬─────┘ │ │ ┌────▼─────┐ │ │ │ 优先级排序│ │ │ └────┬─────┘ │ └───────┼────────┘ │ ┌─────────────┼─────────────┐ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ executor │ │ risk/ │ │ monitor/ │ │ 交易执行 │ │ 风控 │ │ 日志 │ └──────────┘ └──────────┘ └──────────┘ ``` --- ## 回测引擎 回测引擎使用历史 K 线数据模拟策略执行,评估策略在历史行情中的表现。 ### 设计目标 - **事件驱动**:模拟真实行情推送,按时间顺序逐根 K 线喂给策略 - **精确模拟**:考虑手续费、滑点、最小下单量等实际交易约束 - **多维度指标**:总收益率、年化收益、夏普比率、最大回撤、胜率、盈亏比 - **可视化输出**:资金曲线图、回撤曲线、交易点标注 ### 架构 ``` ┌──────────────────────────┐ │ 回测引擎 │ │ │ 历史K线数据 ─────►│ ┌────────────────────┐ │ (TimescaleDB) │ │ BacktestEngine │ │ │ │ │ │ 策略配置 ────────►│ │ - 加载历史K线 │ │ │ │ - 逐根推送给策略 │ │ 策略类 ──────────►│ │ - 模拟订单成交 │ │ │ │ - 计算性能指标 │ │ │ │ - 生成回测报告 │ │ │ └────────────────────┘ │ └──────────┬───────────────┘ │ ▼ ┌──────────────────────────┐ │ 回测结果 │ │ - equity_curve │ │ - trades │ │ - metrics │ │ - report (HTML/JSON) │ └──────────────────────────┘ ``` ### 接口设计 ```python # engine/backtest.py from dataclasses import dataclass, field from datetime import datetime from typing import Optional, Type from pydantic import BaseModel from .common import BaseStrategy, StrategyConfig @dataclass class BacktestConfig: """回测配置""" symbol: str exchange: str = "binance" interval: str = "1h" start_time: datetime end_time: datetime # 交易成本 commission_pct: float = 0.001 # 手续费率 (0.1%) slippage_pct: float = 0.0005 # 滑点 (0.05%) min_order_qty: float = 0.001 # 最小下单量 # 资金 initial_capital: float = 10_000.0 # 初始资金 # 数据 warmup_bars: int = 100 # 预热 K 线条数 @dataclass class BacktestTrade: """单笔回测交易记录""" timestamp: datetime symbol: str side: str price: float quantity: float commission: float slippage: float pnl: Optional[float] = None reason: str = "" @dataclass class BacktestMetrics: """回测性能指标""" total_return_pct: float # 总收益率 annual_return_pct: float # 年化收益率 sharpe_ratio: float # 夏普比率 sortino_ratio: float # 索提诺比率 max_drawdown_pct: float # 最大回撤 max_drawdown_duration: int # 最大回撤持续天数 win_rate: float # 胜率 profit_factor: float # 盈亏比 total_trades: int # 总交易次数 avg_holding_hours: float # 平均持仓时间(小时) calmar_ratio: float # 卡尔玛比率 @dataclass class BacktestResult: """完整回测结果""" config: BacktestConfig strategy_config: StrategyConfig metrics: BacktestMetrics trades: list[BacktestTrade] equity_curve: list[dict] # [{timestamp, equity, drawdown}, ...] class BacktestEngine: """回测引擎""" def __init__(self, config: BacktestConfig): self.config = config self._klines: list = [] self._trades: list[BacktestTrade] = [] self._equity: list[dict] = [] async def load_data(self) -> None: """从 TimescaleDB 加载历史 K 线数据""" ... async def run( self, strategy_cls: Type[BaseStrategy], strategy_config: StrategyConfig, ) -> BacktestResult: """ 执行回测。 流程: 1. 预热:先喂 warmup_bars 根 K 线但不产生交易 2. 回测循环:逐根 K 线推送给策略 3. 收到 Signal 时模拟订单成交 4. 每根 K 线更新资金曲线 5. 计算最终指标 """ ... async def run_batch( self, strategy_cls: Type[BaseStrategy], configs: list[StrategyConfig], max_concurrency: int = 4, ) -> list[BacktestResult]: """批量回测(并行执行,用于参数扫描)""" ... @staticmethod def compute_metrics( equity_curve: list[dict], trades: list[BacktestTrade], config: BacktestConfig, ) -> BacktestMetrics: """从资金曲线和交易记录计算性能指标""" ... def generate_report(self, result: BacktestResult, format: str = "json") -> str: """生成回测报告(json/html/markdown)""" ... def plot(self, result: BacktestResult) -> None: """绘制资金曲线和回撤图(matplotlib)""" ... ``` ### 回测执行流程 ``` 开始 │ ▼ ┌─────────────────────┐ │ 加载历史 K 线数据 │ │ (TimescaleDB) │ └──────────┬──────────┘ │ ▼ ┌─────────────────────┐ │ 预热阶段 │ │ 喂入 warmup_bars │ │ 策略初始化指标 │ └──────────┬──────────┘ │ ▼ ┌─────────────────────┐ │ 回测主循环 │ │ │ │ for kline in data: │ │ signal = strat. │ │ on_kline(kline)│ │ if signal: │ │ simulate_trade │ │ update_equity() │ │ │ └──────────┬──────────┘ │ ▼ ┌─────────────────────┐ │ 计算性能指标 │ │ - 收益率 │ │ - 夏普比率 │ │ - 最大回撤 │ │ - 胜率等 │ └──────────┬──────────┘ │ ▼ ┌─────────────────────┐ │ 生成回测报告 │ │ - JSON 结果 │ │ - 资金曲线图 │ │ - HTML 报告 │ └─────────────────────┘ ``` ### 交易模拟逻辑 ``` 收到 Signal │ ▼ ┌──────────────┐ │ 输入价格 = │ │ kline.close │ │ + slippage │ └──────┬───────┘ │ ▼ ┌──────────────┐ │ 检查余额 │──── 不足 ──► 跳过,记录日志 │ 检查最小下单量 │ └──────┬───────┘ │ 通过 ▼ ┌──────────────┐ │ 执行成交 │ │ - 更新持仓 │ │ - 扣除手续费 │ │ - 记录交易 │ │ - 计算盈亏 │ └──────────────┘ ``` --- ## 参数优化器 参数优化器基于回测引擎,自动搜索最优策略参数组合。 ### 支持的优化策略 | 方法 | 适用场景 | 特点 | |------|---------|------| | **网格搜索** | 参数空间小(2-3 维) | 穷举所有组合,结果完备但慢 | | **随机搜索** | 参数空间中等 | 比网格搜索更高效 | | **贝叶斯优化** | 参数空间大 | 利用先验知识,收敛快 | | **遗传算法** | 复杂非线性参数空间 | 全局搜索能力强 | ### 接口设计 ```python # engine/optimizer.py from dataclasses import dataclass, field from typing import Callable, Any from backtest import BacktestEngine, BacktestConfig, BacktestResult from common import BaseStrategy, StrategyConfig @dataclass class ParamSpec: """参数规格定义""" name: str type: str # "int" / "float" / "choice" low: Optional[float] = None # 数值参数下界 high: Optional[float] = None # 数值参数上界 step: Optional[float] = None # 步长(网格搜索用) choices: Optional[list[Any]] = None # 离散选择 log_scale: bool = False # 是否对数尺度搜索 @dataclass class OptimizationConfig: """优化器配置""" method: str = "bayesian" # "grid" / "random" / "bayesian" / "genetic" objective: str = "sharpe_ratio" # 优化目标指标 max_iterations: int = 100 # 最大迭代次数 n_jobs: int = 4 # 并行任务数 early_stopping_rounds: int = 10 # 早停轮数 cv_folds: int = 3 # 交叉验证折数 @dataclass class OptimizationResult: """单次优化结果""" params: dict[str, Any] # 最优参数 score: float # 目标函数值 all_trials: list[dict] # 所有试验记录 best_result: BacktestResult # 最优参数的回测结果 class ParamOptimizer: """参数优化器""" def __init__( self, backtest_config: BacktestConfig, strategy_cls: Type[BaseStrategy], base_config: StrategyConfig, param_specs: list[ParamSpec], opt_config: OptimizationConfig = OptimizationConfig(), ): ... async def optimize(self) -> OptimizationResult: """ 执行参数优化。 返回最优参数组合及对应的回测结果。 """ ... def suggest_params(self, n: int = 5) -> list[dict]: """获取 top-n 参数组合建议""" ... def plot_importance(self) -> None: """绘制参数重要性图""" ... def plot_parallel_coordinates(self) -> None: """绘制参数平行坐标图""" ... ``` ### 优化流程 ``` ┌────────────────┐ │ 定义参数空间 │ │ ParamSpec[] │ └───────┬────────┘ │ ▼ ┌────────────────┐ │ 选择优化算法 │ │ (网格/贝叶斯等) │ └───────┬────────┘ │ ┌────────▼────────┐ │ 优化主循环 │ │ │ │ while n < max: │ │ params = │ │ suggest() │ │ result = │ │ backtest() │ │ score = │ │ metric() │ │ update() │ │ check_early()│ │ │ └────────┬────────┘ │ ▼ ┌────────────────┐ │ 返回最优结果 │ │ + 所有试验记录 │ └────────────────┘ ``` ### 目标函数 ``` 目标函数可选指标(最小化回撤时取负值): sharpe_ratio — 夏普比率(风险调整后收益,推荐) sortino_ratio — 索提诺比率(只考虑下行波动) total_return — 总收益率 calmar_ratio — 卡尔玛比率(收益/最大回撤) profit_factor — 盈亏比 custom — 自定义组合指标 ``` --- ## 数据流与生命周期 ### 实时交易数据流 ``` Redis Pub/Sub engine/ executor/ ───────────── ───────── ───────── market:kline:* ──► RedisConsumer ──► Manager.feed_kline() │ ▼ 路由到对应 symbol 策略 │ ┌─────┴─────┐ ▼ ▼ 策略A.on_kline 策略B.on_kline │ │ Signal? Signal? │ │ └─────┬─────┘ ▼ SignalBus.emit() │ ┌───────────┼───────────┐ ▼ ▼ ▼ 冷却检查 去重合并 优先级排序 │ │ │ └───────────┼───────────┘ ▼ ┌──────────┐ │ risk/ │ ← 风控检查 │ manager │ └────┬─────┘ │ passed ▼ ┌──────────┐ │ executor │ ← 实际下单 └──────────┘ │ ▼ 订单状态回调 │ ▼ Manager.notify_order_filled() │ ▼ 策略.on_order_filled() ``` ### 回测数据流 ``` TimescaleDB ──► BacktestEngine.load_data() │ ▼ 历史 K 线列表 │ ▼ BacktestEngine.run() │ ▼ 逐根推送 K 线 ──► 策略.on_kline() │ │ │ Signal? │ │ ▼ ▼ 更新资金曲线 simulate_trade() │ ▼ 计算指标 ──► BacktestResult ``` ### 引擎主循环 ```python # engine/main.py — 主程序入口示意 import asyncio from config import settings from common.storage import TimescaleDBReader from manager import StrategyManager from signals import SignalBus async def main(): # 1. 初始化信号总线 signal_bus = SignalBus() # 2. 初始化策略管理器 manager = StrategyManager(signal_bus) manager.discover_strategies("strategies") # 3. 创建并注册策略实例 for cfg in settings.strategies: manager.create(cfg.type, cfg) # 4. 启动所有策略 await manager.start_all() # 5. 订阅 Redis 行情 consumer = RedisMarketConsumer(settings.redis_url) await consumer.subscribe_kline(manager.feed_kline) await consumer.subscribe_ticker(manager.feed_ticker) # 6. 保持运行 await asyncio.Event().wait() if __name__ == "__main__": asyncio.run(main()) ``` --- ## 与上下游模块的交互 ### 上游:TypeScript 数据模块 引擎通过 Redis Pub/Sub 消费 TS 侧推送的行情数据: | Redis 频道 | 数据类型 | 对应策略方法 | |-----------|---------|-------------| | `market:ticker:{exchange}:{symbol}` | Ticker | `on_ticker()` | | `market:kline:{exchange}:{symbol}:{interval}` | Kline | `on_kline()` | | `market:trade:{exchange}:{symbol}` | Trade | `on_trade()` | | `market:orderbook:{exchange}:{symbol}` | OrderBook | `on_orderbook()` | 数据格式使用 **MessagePack** 序列化,Python 侧用 `msgpack.unpackb()` 反序列化为 Pydantic 模型。 ### 下游:交易执行模块 引擎通过 `SignalBus` 发布信号,executor 订阅: ```python # executor 侧订阅 signal_bus.subscribe("order", executor.handle_signal) ``` ### 下游:风控模块 风控模块也订阅信号总线,在 executor 执行前进行风控检查: ```python # risk 侧订阅 signal_bus.subscribe("order", risk_manager.check_signal) ``` ### 下游:监控模块 监控模块订阅所有信号用于记录和统计: ```python # monitor 侧订阅 signal_bus.subscribe("*", monitor.record_signal) ``` --- ## 目录结构 ``` engine/ ├── ENGINE.md # 本文档 ├── __init__.py ├── env.yaml # 引擎环境配置 │ ├── common/ # 通用模块(基础数据类型、策略基类、日志) │ ├── __init__.py │ ├── base.py # BaseStrategy 抽象基类 + Signal / StrategyConfig │ ├── models.py # 数据模型:Ticker / Kline / Trade / OrderBook │ └── logger.py # 结构化日志模块 │ ├── manager.py # StrategyManager 策略管理器 ├── signals.py # SignalBus 信号分发器 + SignalFilter 过滤器 ├── backtest.py # BacktestEngine 回测引擎 ├── optimizer.py # ParamOptimizer 参数优化器 ├── main.py # 引擎主循环入口 │ ├── indicators/ # 技术指标计算(基于 TA-Lib / pandas_ta) │ ├── __init__.py │ ├── trend.py # 趋势指标 (MA, EMA, MACD, ADX...) │ ├── momentum.py # 动量指标 (RSI, Stochastic, CCI...) │ ├── volatility.py # 波动率指标 (Bollinger, ATR, Keltner...) │ ├── volume.py # 成交量指标 (OBV, VWAP, MFI...) │ └── composite.py # 复合指标(自定义组合) │ ├── consumers/ # 行情消费器(Redis → 策略馈送) │ ├── __init__.py │ ├── redis.py # Redis Pub/Sub 消费者 │ └── nats.py # NATS 消费者(可选) │ └── tests/ # 引擎测试 ├── __init__.py ├── test_base.py ├── test_manager.py ├── test_signals.py ├── test_backtest.py ├── test_optimizer.py └── fixtures/ # 测试夹具(模拟行情数据) └── sample_klines.json ``` --- ## 开发步骤 ### 第一阶段:核心框架(第 1 周) | 步骤 | 任务 | 产出 | |------|------|------| | 1.1 | 实现 `common/base.py` — `BaseStrategy`、`Signal`、`StrategyConfig` | 策略基类和类型定义 | | 1.2 | 实现 `common/models.py` — `Ticker`、`Kline`、`Trade`、`OrderBook` Pydantic 模型 | 与 TS 对齐的数据模型 | | 1.3 | 实现 `common/logger.py` — 结构化日志模块 | 统一日志输出 | | 1.3 | 实现 `common/logger.py` — 结构化日志模块 | 统一日志输出 | | 1.4 | 实现 `signals.py` — `SignalBus`、`CooldownFilter`、`DuplicateFilter` | 事件总线和过滤链 | | 1.5 | 实现 `manager.py` — 策略注册、创建、启停 | 策略管理器 | | 1.6 | 实现 `consumers/redis.py` — Redis Pub/Sub 消费者 | 实时行情接收 | | 1.7 | 编写 `main.py` — 引擎主循环串联 | 端到端可运行 | ### 第二阶段:指标与策略示例(第 2 周) | 步骤 | 任务 | 产出 | |------|------|------| | 2.1 | 实现 `indicators/` — MA、EMA、MACD、RSI、Bollinger | 技术指标库 | | 2.2 | 编写示例策略 1:双均线交叉 | `strategies/ma_cross.py` | | 2.3 | 编写示例策略 2:网格交易 | `strategies/grid_trading.py` | | 2.4 | 集成测试:Redis 行情 → 策略 → 信号 | 端到端验证 | ### 第三阶段:回测(第 3 周) | 步骤 | 任务 | 产出 | |------|------|------| | 3.1 | 实现 `backtest.py` — 回测引擎核心 | 事件驱动回测 | | 3.2 | 实现交易模拟逻辑(手续费、滑点) | 精确回测 | | 3.3 | 实现性能指标计算(夏普比率、最大回撤等) | 完整指标 | | 3.4 | 实现回测报告生成(JSON/HTML) | 可视化报告 | | 3.5 | 编写回测测试用例 | 回测验证 | ### 第四阶段:参数优化(第 4 周) | 步骤 | 任务 | 产出 | |------|------|------| | 4.1 | 实现 `optimizer.py` — 网格搜索 | 穷举搜索 | | 4.2 | 集成 Optuna 实现贝叶斯优化 | 智能搜索 | | 4.3 | 实现交叉验证 | 防止过拟合 | | 4.4 | 实现参数重要性分析 | 参数洞察 | --- ## 实现规范 ### 异步编程规范 - 所有 I/O 操作使用 `async/await` - 长时间计算任务使用 `asyncio.to_thread()` 放入线程池 - 避免在协程中阻塞(不用 `time.sleep`,用 `asyncio.sleep`) - 策略的 `on_*` 方法必须是非阻塞的快速返回 ### 错误处理 ```python # 策略错误隔离模式 async def feed_kline(self, kline) -> None: for name, strategy in self._strategies.items(): if not strategy.is_running: continue if strategy.config.symbol != kline.symbol: continue try: signal = await strategy.on_kline(kline) if signal: await self.signal_bus.emit(signal) except Exception as e: logger.error(f"Strategy {name} error: {e}", exc_info=True) # 单策略异常不影响其他策略 ``` ### 日志规范 ```python from common.logger import logger # 使用结构化日志 logger.info("strategy_signal", strategy="ma_cross", symbol="BTCUSDT", side="BUY", confidence=0.85, timestamp=1234567890) logger.warning("strategy_error", strategy="grid", error=str(e)) logger.info("backtest_complete", symbol="ETHUSDT", sharpe=1.45, trades=230) ``` ### 类型规范 - 所有公开接口使用类型注解 - 数据模型使用 Pydantic `BaseModel` - 策略配置继承 `StrategyConfig` 并扩展 - 信号必须通过 `Signal` 模型创建,禁止使用裸 dict ### 测试规范 - 核心逻辑(SignalBus、StrategyManager)单元测试覆盖率 > 90% - 策略逻辑有独立单元测试 - 回测引擎用已知结果的静态数据集验证 - 使用 `pytest-asyncio` 支持异步测试 ### 性能基准 | 指标 | 目标值 | 说明 | |------|--------|------| | 策略单次 `on_kline` 耗时 | < 1ms | 不含 I/O 的纯计算 | | SignalBus 发送延迟 | < 0.1ms | emit 到 handler 开始执行 | | 回测速度 | > 10000 bars/s | 10 万根 K 线 < 10 秒 | | 100 策略并行 | CPU < 50% | 空策略情况下的资源占用 | --- ## 附录 ### A. 示例策略实现(双均线交叉) ```python # strategies/ma_cross.py from typing import Optional from common import BaseStrategy, StrategyConfig, Signal class MACrossConfig(StrategyConfig): fast_period: int = 7 slow_period: int = 25 class MACrossStrategy(BaseStrategy): strategy_type = "ma_cross" def __init__(self, config: MACrossConfig): super().__init__(config) self.config: MACrossConfig = config self._prices: list[float] = [] self._last_signal: Optional[str] = None # "BUY" / "SELL" def _sma(self, data: list[float], period: int) -> float: if len(data) < period: return 0.0 return sum(data[-period:]) / period async def on_kline(self, kline) -> Optional[Signal]: self._prices.append(kline.close) fast = self._sma(self._prices, self.config.fast_period) slow = self._sma(self._prices, self.config.slow_period) if fast == 0 or slow == 0: return None if fast > slow and self._last_signal != "BUY": self._last_signal = "BUY" return Signal( symbol=self.config.symbol, side="BUY", signal_type="MARKET", confidence=0.7, reason=f"Golden cross: MA{self.config.fast_period} > MA{self.config.slow_period}", timestamp=kline.timestamp, ) if fast < slow and self._last_signal != "SELL": self._last_signal = "SELL" return Signal( symbol=self.config.symbol, side="SELL", signal_type="MARKET", confidence=0.7, reason=f"Death cross: MA{self.config.fast_period} < MA{self.config.slow_period}", timestamp=kline.timestamp, ) return None ``` ### B. 引擎配置示例(engine/env.yaml) ```yaml # engine/env.yaml engine: name: "trade-engine" strategies_package: "strategies" max_concurrent_strategies: 50 redis: url: "redis://localhost:6379/0" channels: kline: "market:kline:*" ticker: "market:ticker:*" orderbook: "market:orderbook:*" signals: cooldown_seconds: 1.0 # 同 symbol 下单冷却时间 min_confidence: 0.3 # 最低信号置信度 max_signals_per_minute: 60 # 每分钟最大信号数 backtest: default_commission: 0.001 default_slippage: 0.0005 default_capital: 10000.0 optimizer: default_method: "bayesian" default_objective: "sharpe_ratio" max_iterations: 200 n_jobs: 4 early_stopping: 20 ``` ### C. 相关文档索引 | 文档 | 路径 | |------|------| | 系统总体架构 | `../README.md` | | 数据模块开发指南 | `../data/ENGINE.md` (待创建) | | 交易执行模块 | `../executor/ENGINE.md` (待创建) | | 风控模块 | `../risk/ENGINE.md` (待创建) | | 公共模块 | `./common/__init__.py` |