Files
trade/engine/ENGINE.md
T
Rekey 4d66a86234 docs: 添加项目规范与引擎设计文档
- AGENTS.md: AI Agent 规则,定义项目运行环境、架构约定、常用命令
- ENGINE.md: Python 引擎架构设计文档,涵盖策略管理、信号总线、回测引擎等模块设计
2026-06-12 10:26:30 +08:00

46 KiB
Raw Blame History

策略引擎开发文档

数字货币量化交易系统 — 策略引擎模块(Python 3.10+)设计与开发指南


目录

  1. 概述与定位
  2. 核心架构
  3. 策略基类设计
  4. 策略管理器
  5. 信号分发器
  6. 回测引擎
  7. 参数优化器
  8. 数据流与生命周期
  9. 与上下游模块的交互
  10. 目录结构
  11. 开发步骤
  12. 实现规范

概述与定位

策略引擎是整个量化交易系统的核心调度中心,位于 Python 业务层,负责:

  • 策略生命周期管理:注册、热加载、启动、暂停、停止、卸载策略
  • 实时信号生成:消费 TS 数据模块推送的行情数据,驱动策略产生交易信号
  • 回测验证:使用历史数据模拟策略执行,评估收益、回撤、夏普比率等指标
  • 参数搜索:通过网格搜索或贝叶斯优化自动寻找最优策略参数
  • 下游集成:将交易信号通过信号分发器路由到交易执行模块和风控模块

在系统中的位置

                          ┌──────────────────────────┐
                          │    策略引擎  🐍 Python     │
                          │                          │
   Redis Pub/Sub ──────►  │  ┌────────────────────┐  │
   行情数据                 │  │   信号分发器        │──► executor/
                          │  └────────────────────┘  │
   TimescaleDB ◄───────   │  ┌────────────────────┐  │
   历史数据(回测)         │  │   策略管理器        │  │
                          │  │  ┌──────┬──────┐   │  │
                          │  │  │策略A │策略B │   │  │
                          │  │  └──────┴──────┘   │  │
                          │  └────────────────────┘  │
                          │  ┌────────────────────┐  │
                          │  │   回测引擎          │  │
                          │  └────────────────────┘  │
                          │  ┌────────────────────┐  │
                          │  │   参数优化器        │  │
                          │  └────────────────────┘  │
                          └──────────────────────────┘

核心架构

模块依赖图

                        ┌──────────────────┐
                        │ engine/common/   │  ← 配置、日志、数据模型、策略基类
                        └────────┬─────────┘
                                 │
            ┌────────────────────┼────────────────────┐
            │                    │                    │
       ┌────▼────┐        ┌──────▼──────┐      ┌─────▼──────┐
       │ base.py │        │  manager.py │      │ consumers/ │
       │策略基类  │◄───────│  策略管理器  │      │ 行情消费    │
       └────┬────┘        └──────┬──────┘      └────────────┘
            │                    │
            │             ┌──────▼──────┐
       ┌────▼────┐        │ signals.py  │      ┌─────────┐
       │strategies│       │  信号分发器  │─────►│executor/│
       │ 策略实现 │        └─────────────┘      └─────────┘
       └────┬────┘
            │
       ┌────▼────┐        ┌──────────────┐
       │backtest │        │ optimizer.py │
       │回测引擎  │◄───────│  参数优化器   │
       └─────────┘        └──────────────┘

设计原则

原则 说明
插件化 策略以插件形式注册,新增策略无需修改引擎核心代码
异步优先 全部 I/O 操作使用 asyncio,保证高并发下的低延迟
类型安全 核心接口用 ABC + Pydantic 模型约束,运行时校验
无状态 引擎本身不持有策略状态,状态由策略实例自行管理
错误隔离 单个策略异常不影响其他策略运行,也不影响引擎主循环
可观测 每个生命周期事件埋点,输出结构化日志和 Prometheus 指标

策略基类设计

抽象基类

# engine/common/base.py
from abc import ABC, abstractmethod
from typing import Optional
from pydantic import BaseModel

from .models import Ticker, Kline, OrderBook, Trade


class StrategyConfig(BaseModel):
    """策略配置(由具体策略子类化扩展)"""
    name: str
    symbol: str
    exchange: str = "binance"
    enabled: bool = True
    max_position_pct: float = 0.1          # 最大仓位比例
    stop_loss_pct: Optional[float] = None  # 止损百分比
    take_profit_pct: Optional[float] = None # 止盈百分比


class Signal(BaseModel):
    """交易信号"""
    symbol: str
    side: str                             # "BUY" / "SELL"
    signal_type: str                      # "MARKET" / "LIMIT" / "CANCEL"
    price: Optional[float] = None         # 限价单价格
    quantity: Optional[float] = None      # 下单数量
    confidence: float = 1.0               # 信号置信度 [0, 1]
    reason: str = ""                      # 信号生成原因(便于审计)
    timestamp: float                      # 信号时间戳


class BaseStrategy(ABC):
    """所有策略的抽象基类"""

    # 策略类型标识,子类必须覆盖
    strategy_type: str = "base"

    def __init__(self, config: StrategyConfig):
        self.config = config
        self.is_running = False
        self.pnl = 0.0
        self.trade_count = 0

    # ── 生命周期钩子 ──

    async def on_start(self) -> None:
        """策略启动时调用(加载初始状态、预热等)"""
        self.is_running = True

    async def on_stop(self) -> None:
        """策略停止时调用(清理、平仓等)"""
        self.is_running = False

    async def on_pause(self) -> None:
        """策略暂停时调用"""
        pass

    async def on_resume(self) -> None:
        """策略恢复时调用"""
        pass

    # ── 行情事件处理器(子类按需实现)──

    @abstractmethod
    async def on_kline(self, kline: Kline) -> Optional[Signal]:
        """处理 K 线数据,返回交易信号"""
        ...

    async def on_ticker(self, ticker: Ticker) -> Optional[Signal]:
        """处理 Ticker 数据(可选实现)"""
        return None

    async def on_trade(self, trade: Trade) -> Optional[Signal]:
        """处理逐笔成交数据(可选实现)"""
        return None

    async def on_orderbook(self, orderbook: OrderBook) -> Optional[Signal]:
        """处理深度数据(可选实现)"""
        return None

    # ── 交易反馈钩子 ──

    async def on_order_filled(self, order_result) -> None:
        """订单成交回调"""
        pass

    async def on_order_cancelled(self, order_id: str) -> None:
        """订单撤销回调"""
        pass

    # ── 辅助方法 ──

    async def get_klines(
        self, interval: str, limit: int = 100
    ) -> list[Kline]:
        """从 TimescaleDB 获取历史 K 线(回测和预热用)"""
        ...

    def log(self, level: str, message: str, **kwargs) -> None:
        """结构化日志"""
        ...

策略状态机

         register
            │
            ▼
        ┌────────┐    start    ┌─────────┐
        │ LOADED │ ──────────► │ RUNNING │
        └────────┘             └────┬─────┘
            ▲                       │
            │              ┌────────┴────────┐
            │              ▼                 ▼
            │         ┌─────────┐      ┌──────────┐
            └─────────│ PAUSED  │      │ STOPPING │
            unload    └────┬─────┘      └────┬─────┘
                          │     resume      │
                          └───► RUNNING     │
                                            ▼
                                       ┌──────────┐
                                       │ STOPPED  │
                                       └────┬─────┘
                                            │ unload
                                            ▼
                                        (removed)

策略生命周期事件流

on_start()
    │
    ▼
┌──────────────────────────────────────────────────────────┐
│                    主循环(事件驱动)                       │
│                                                          │
│  on_kline(kline) ──► Signal? ──► signal_bus.emit(signal) │
│  on_ticker(ticker) ─► Signal? ──► signal_bus.emit(signal) │
│  on_orderbook(ob) ──► Signal? ──► signal_bus.emit(signal) │
│                                                          │
│  on_order_filled(result)   ←── executor 回调              │
│  on_order_cancelled(id)    ←── executor 回调              │
└──────────────────────────────────────────────────────────┘
    │
    ▼
on_stop()

策略管理器

策略管理器是引擎的入口组件,负责所有策略实例的注册、发现、加载和生命周期调度。

接口设计

# engine/manager.py
import importlib
import pkgutil
from pathlib import Path
from typing import Type, Optional
from .common.logger import logger

from .common import BaseStrategy, StrategyConfig, Signal
from .signals import SignalBus


class StrategyManager:
    """
    策略管理器:策略注册、热加载、生命周期调度

    用法:
        manager = StrategyManager(signal_bus=signal_bus)
        manager.discover_strategies("strategies")
        await manager.start_all()
    """

    def __init__(self, signal_bus: SignalBus):
        self._strategies: dict[str, BaseStrategy] = {}
        self._strategy_classes: dict[str, Type[BaseStrategy]] = {}
        self.signal_bus = signal_bus

    # ── 策略发现与注册 ──

    def discover_strategies(self, package_path: str = "strategies") -> list[str]:
        """
        自动发现策略包下的所有策略类。

        约定:
        - 策略文件放在 `strategies/` 目录下
        - 策略类必须继承 `BaseStrategy` 且在其模块中定义

        返回发现的策略类型名列表。
        """
        ...

    def register(self, strategy_cls: Type[BaseStrategy]) -> None:
        """手动注册策略类"""
        ...

    # ── 策略实例管理 ──

    def create(
        self, strategy_type: str, config: StrategyConfig
    ) -> BaseStrategy:
        """
        根据策略类型创建实例。

        Raises:
            ValueError: 策略类型未注册
        """
        ...

    async def start(self, name: str) -> None:
        """启动指定策略"""
        ...

    async def stop(self, name: str) -> None:
        """停止指定策略"""
        ...

    async def pause(self, name: str) -> None:
        """暂停指定策略(保留状态,不接收行情)"""
        ...

    async def resume(self, name: str) -> None:
        """恢复暂停的策略"""
        ...

    async def restart(self, name: str, new_config: Optional[StrategyConfig] = None) -> BaseStrategy:
        """重启策略(可选地更新配置)"""
        ...

    def get(self, name: str) -> Optional[BaseStrategy]:
        """获取策略实例"""
        ...

    def list_strategies(self) -> list[dict]:
        """列出所有策略的状态"""
        ...

    # ── 批量操作 ──

    async def start_all(self) -> None:
        """启动所有 enabled 策略"""
        ...

    async def stop_all(self) -> None:
        """停止所有策略"""
        ...

    # ── 行情入口(由主循环调用)──

    async def feed_kline(self, kline) -> None:
        """将 K 线数据分发到对应 symbol 的策略"""
        ...

    async def feed_ticker(self, ticker) -> None:
        """将 Ticker 分发到对应 symbol 的策略"""
        ...

    async def feed_orderbook(self, orderbook) -> None:
        """将 OrderBook 分发到对应 symbol 的策略"""
        ...

    # ── 交易反馈入口 ──

    async def notify_order_filled(self, strategy_name: str, order_result) -> None:
        """通知策略订单已成交"""
        ...

    # ── 统计与监控 ──

    def stats(self) -> dict:
        """返回引擎统计信息"""
        ...

策略热加载机制

                                策略文件变更
                                      │
                                      ▼
                            ┌──────────────────┐
                            │  文件监听器       │
                            │  (watchdog)      │
                            └────────┬─────────┘
                                     │
                          ┌──────────▼──────────┐
                          │ 重新 importlib.reload│
                          └──────────┬──────────┘
                                     │
                          ┌──────────▼──────────┐
                          │ 更新 _strategy_classes│
                          └──────────┬──────────┘
                                     │
                          ┌──────────▼──────────┐
                          │ 重新创建策略实例      │
                          │ (stop → create → start)│
                          └─────────────────────┘

信号分发器

信号分发器是策略到交易执行模块之间的解耦层,提供事件总线能力。

设计目标

  • 解耦:策略不直接调用交易 API,只 emit 信号
  • 过滤:同一 symbol 的冲突信号自动去重/合并
  • 优先级:支持信号优先级排序(高置信度优先)
  • 限流:防止策略过度交易,内置冷却时间
  • 审计:所有信号记录到数据库,可追溯

接口设计

# engine/signals.py
import asyncio
from collections import defaultdict
from typing import Callable, Awaitable

from .common import Signal


SignalHandler = Callable[[Signal], Awaitable[None]]


class SignalBus:
    """信号事件总线"""

    def __init__(self):
        self._handlers: dict[str, list[SignalHandler]] = defaultdict(list)
        self._signal_history: list[Signal] = []
        self._cooldowns: dict[str, float] = {}  # symbol -> 下次允许信号时间

    def subscribe(self, event_type: str, handler: SignalHandler) -> None:
        """
        订阅信号类型。

        event_type: "order" / "alert" / "log" 或自定义
        handler: async callable,接收 Signal 参数
        """
        ...

    async def emit(self, signal: Signal) -> None:
        """
        发布信号。会经过以下处理链:
        1. 冷却检查
        2. 冲突检测
        3. 优先级排序
        4. 分发给所有订阅者
        5. 记录到审计日志
        """
        ...

    def set_cooldown(self, symbol: str, seconds: float) -> None:
        """设置 symbol 的下单冷却时间"""
        ...

    def recent_signals(self, symbol: str, n: int = 10) -> list[Signal]:
        """获取最近的信号历史"""
        ...


class SignalFilter:
    """信号过滤器基类(责任链模式)"""

    async def filter(self, signal: Signal) -> Signal | None:
        """返回过滤后的信号,返回 None 表示丢弃"""
        return signal


class CooldownFilter(SignalFilter):
    """冷却时间过滤器"""
    ...


class DuplicateFilter(SignalFilter):
    """重复信号过滤器(同一 symbol 短期内相同方向合并)"""
    ...


class ConfidenceFilter(SignalFilter):
    """置信度过滤器(低于阈值的信号丢弃)"""
    ...

信号流向

策略A.on_kline() ──► Signal ──┐
策略B.on_ticker() ─► Signal ──┤
策略C.on_kline() ──► Signal ──┤
                               ▼
                      ┌────────────────┐
                      │   SignalBus    │
                      │                │
                      │  ┌──────────┐  │
                      │  │ 冷却检查  │  │
                      │  └────┬─────┘  │
                      │  ┌────▼─────┐  │
                      │  │ 去重合并  │  │
                      │  └────┬─────┘  │
                      │  ┌────▼─────┐  │
                      │  │ 优先级排序│  │
                      │  └────┬─────┘  │
                      └───────┼────────┘
                              │
                ┌─────────────┼─────────────┐
                ▼             ▼             ▼
         ┌──────────┐  ┌──────────┐  ┌──────────┐
         │ executor │  │  risk/   │  │ monitor/ │
         │ 交易执行  │  │  风控    │  │  日志    │
         └──────────┘  └──────────┘  └──────────┘

回测引擎

回测引擎使用历史 K 线数据模拟策略执行,评估策略在历史行情中的表现。

设计目标

  • 事件驱动:模拟真实行情推送,按时间顺序逐根 K 线喂给策略
  • 精确模拟:考虑手续费、滑点、最小下单量等实际交易约束
  • 多维度指标:总收益率、年化收益、夏普比率、最大回撤、胜率、盈亏比
  • 可视化输出:资金曲线图、回撤曲线、交易点标注

架构

                    ┌──────────────────────────┐
                    │      回测引擎             │
                    │                          │
  历史K线数据 ─────►│  ┌────────────────────┐  │
  (TimescaleDB)    │  │   BacktestEngine    │  │
                    │  │                    │  │
  策略配置 ────────►│  │  - 加载历史K线      │  │
                    │  │  - 逐根推送给策略    │  │
  策略类 ──────────►│  │  - 模拟订单成交      │  │
                    │  │  - 计算性能指标      │  │
                    │  │  - 生成回测报告      │  │
                    │  └────────────────────┘  │
                    └──────────┬───────────────┘
                               │
                               ▼
                    ┌──────────────────────────┐
                    │      回测结果             │
                    │  - equity_curve          │
                    │  - trades                │
                    │  - metrics               │
                    │  - report (HTML/JSON)    │
                    └──────────────────────────┘

接口设计

# engine/backtest.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Type
from pydantic import BaseModel

from .common import BaseStrategy, StrategyConfig


@dataclass
class BacktestConfig:
    """回测配置"""
    symbol: str
    exchange: str = "binance"
    interval: str = "1h"
    start_time: datetime
    end_time: datetime

    # 交易成本
    commission_pct: float = 0.001       # 手续费率 (0.1%)
    slippage_pct: float = 0.0005        # 滑点 (0.05%)
    min_order_qty: float = 0.001        # 最小下单量

    # 资金
    initial_capital: float = 10_000.0   # 初始资金

    # 数据
    warmup_bars: int = 100              # 预热 K 线条数


@dataclass
class BacktestTrade:
    """单笔回测交易记录"""
    timestamp: datetime
    symbol: str
    side: str
    price: float
    quantity: float
    commission: float
    slippage: float
    pnl: Optional[float] = None
    reason: str = ""


@dataclass
class BacktestMetrics:
    """回测性能指标"""
    total_return_pct: float             # 总收益率
    annual_return_pct: float            # 年化收益率
    sharpe_ratio: float                 # 夏普比率
    sortino_ratio: float                # 索提诺比率
    max_drawdown_pct: float             # 最大回撤
    max_drawdown_duration: int          # 最大回撤持续天数
    win_rate: float                     # 胜率
    profit_factor: float                # 盈亏比
    total_trades: int                   # 总交易次数
    avg_holding_hours: float            # 平均持仓时间(小时)
    calmar_ratio: float                 # 卡尔玛比率


@dataclass
class BacktestResult:
    """完整回测结果"""
    config: BacktestConfig
    strategy_config: StrategyConfig
    metrics: BacktestMetrics
    trades: list[BacktestTrade]
    equity_curve: list[dict]            # [{timestamp, equity, drawdown}, ...]


class BacktestEngine:
    """回测引擎"""

    def __init__(self, config: BacktestConfig):
        self.config = config
        self._klines: list = []
        self._trades: list[BacktestTrade] = []
        self._equity: list[dict] = []

    async def load_data(self) -> None:
        """从 TimescaleDB 加载历史 K 线数据"""
        ...

    async def run(
        self,
        strategy_cls: Type[BaseStrategy],
        strategy_config: StrategyConfig,
    ) -> BacktestResult:
        """
        执行回测。

        流程:
        1. 预热:先喂 warmup_bars 根 K 线但不产生交易
        2. 回测循环:逐根 K 线推送给策略
        3. 收到 Signal 时模拟订单成交
        4. 每根 K 线更新资金曲线
        5. 计算最终指标
        """
        ...

    async def run_batch(
        self,
        strategy_cls: Type[BaseStrategy],
        configs: list[StrategyConfig],
        max_concurrency: int = 4,
    ) -> list[BacktestResult]:
        """批量回测(并行执行,用于参数扫描)"""
        ...

    @staticmethod
    def compute_metrics(
        equity_curve: list[dict],
        trades: list[BacktestTrade],
        config: BacktestConfig,
    ) -> BacktestMetrics:
        """从资金曲线和交易记录计算性能指标"""
        ...

    def generate_report(self, result: BacktestResult, format: str = "json") -> str:
        """生成回测报告(json/html/markdown"""
        ...

    def plot(self, result: BacktestResult) -> None:
        """绘制资金曲线和回撤图(matplotlib"""
        ...

回测执行流程

                        开始
                         │
                         ▼
              ┌─────────────────────┐
              │  加载历史 K 线数据    │
              │  (TimescaleDB)       │
              └──────────┬──────────┘
                         │
                         ▼
              ┌─────────────────────┐
              │  预热阶段             │
              │  喂入 warmup_bars    │
              │  策略初始化指标       │
              └──────────┬──────────┘
                         │
                         ▼
              ┌─────────────────────┐
              │  回测主循环           │
              │                     │
              │  for kline in data: │
              │    signal = strat.  │
              │      on_kline(kline)│
              │    if signal:       │
              │      simulate_trade │
              │    update_equity()  │
              │                     │
              └──────────┬──────────┘
                         │
                         ▼
              ┌─────────────────────┐
              │  计算性能指标         │
              │  - 收益率            │
              │  - 夏普比率          │
              │  - 最大回撤          │
              │  - 胜率等            │
              └──────────┬──────────┘
                         │
                         ▼
              ┌─────────────────────┐
              │  生成回测报告         │
              │  - JSON 结果         │
              │  - 资金曲线图         │
              │  - HTML 报告         │
              └─────────────────────┘

交易模拟逻辑

收到 Signal
    │
    ▼
┌──────────────┐
│ 输入价格 =     │
│ kline.close   │
│ + slippage    │
└──────┬───────┘
       │
       ▼
┌──────────────┐
│ 检查余额       │──── 不足 ──► 跳过,记录日志
│ 检查最小下单量 │
└──────┬───────┘
       │ 通过
       ▼
┌──────────────┐
│ 执行成交       │
│ - 更新持仓     │
│ - 扣除手续费    │
│ - 记录交易     │
│ - 计算盈亏     │
└──────────────┘

参数优化器

参数优化器基于回测引擎,自动搜索最优策略参数组合。

支持的优化策略

方法 适用场景 特点
网格搜索 参数空间小(2-3 维) 穷举所有组合,结果完备但慢
随机搜索 参数空间中等 比网格搜索更高效
贝叶斯优化 参数空间大 利用先验知识,收敛快
遗传算法 复杂非线性参数空间 全局搜索能力强

接口设计

# engine/optimizer.py
from dataclasses import dataclass, field
from typing import Callable, Any

from backtest import BacktestEngine, BacktestConfig, BacktestResult
from common import BaseStrategy, StrategyConfig


@dataclass
class ParamSpec:
    """参数规格定义"""
    name: str
    type: str                                  # "int" / "float" / "choice"
    low: Optional[float] = None                # 数值参数下界
    high: Optional[float] = None               # 数值参数上界
    step: Optional[float] = None               # 步长(网格搜索用)
    choices: Optional[list[Any]] = None        # 离散选择
    log_scale: bool = False                    # 是否对数尺度搜索


@dataclass
class OptimizationConfig:
    """优化器配置"""
    method: str = "bayesian"                   # "grid" / "random" / "bayesian" / "genetic"
    objective: str = "sharpe_ratio"            # 优化目标指标
    max_iterations: int = 100                  # 最大迭代次数
    n_jobs: int = 4                            # 并行任务数
    early_stopping_rounds: int = 10            # 早停轮数
    cv_folds: int = 3                          # 交叉验证折数


@dataclass
class OptimizationResult:
    """单次优化结果"""
    params: dict[str, Any]                     # 最优参数
    score: float                               # 目标函数值
    all_trials: list[dict]                     # 所有试验记录
    best_result: BacktestResult                # 最优参数的回测结果


class ParamOptimizer:
    """参数优化器"""

    def __init__(
        self,
        backtest_config: BacktestConfig,
        strategy_cls: Type[BaseStrategy],
        base_config: StrategyConfig,
        param_specs: list[ParamSpec],
        opt_config: OptimizationConfig = OptimizationConfig(),
    ):
        ...

    async def optimize(self) -> OptimizationResult:
        """
        执行参数优化。

        返回最优参数组合及对应的回测结果。
        """
        ...

    def suggest_params(self, n: int = 5) -> list[dict]:
        """获取 top-n 参数组合建议"""
        ...

    def plot_importance(self) -> None:
        """绘制参数重要性图"""
        ...

    def plot_parallel_coordinates(self) -> None:
        """绘制参数平行坐标图"""
        ...

优化流程

                  ┌────────────────┐
                  │  定义参数空间    │
                  │  ParamSpec[]    │
                  └───────┬────────┘
                          │
                          ▼
                  ┌────────────────┐
                  │  选择优化算法    │
                  │  (网格/贝叶斯等) │
                  └───────┬────────┘
                          │
                 ┌────────▼────────┐
                 │   优化主循环      │
                 │                 │
                 │  while n < max: │
                 │    params =     │
                 │      suggest()  │
                 │    result =     │
                 │      backtest() │
                 │    score =      │
                 │      metric()   │
                 │    update()     │
                 │    check_early()│
                 │                 │
                 └────────┬────────┘
                          │
                          ▼
                  ┌────────────────┐
                  │  返回最优结果    │
                  │  + 所有试验记录  │
                  └────────────────┘

目标函数

目标函数可选指标(最小化回撤时取负值):

sharpe_ratio     — 夏普比率(风险调整后收益,推荐)
sortino_ratio    — 索提诺比率(只考虑下行波动)
total_return     — 总收益率
calmar_ratio     — 卡尔玛比率(收益/最大回撤)
profit_factor    — 盈亏比
custom           — 自定义组合指标

数据流与生命周期

实时交易数据流

Redis Pub/Sub                        engine/                         executor/
─────────────                       ─────────                        ─────────

market:kline:* ──► RedisConsumer ──► Manager.feed_kline()
                                          │
                                          ▼
                                    路由到对应 symbol 策略
                                          │
                                    ┌─────┴─────┐
                                    ▼           ▼
                              策略A.on_kline  策略B.on_kline
                                    │           │
                               Signal?      Signal?
                                    │           │
                                    └─────┬─────┘
                                          ▼
                                    SignalBus.emit()
                                          │
                              ┌───────────┼───────────┐
                              ▼           ▼           ▼
                         冷却检查      去重合并     优先级排序
                              │           │           │
                              └───────────┼───────────┘
                                          ▼
                                    ┌──────────┐
                                    │ risk/    │    ← 风控检查
                                    │ manager  │
                                    └────┬─────┘
                                         │ passed
                                         ▼
                                    ┌──────────┐
                                    │ executor │    ← 实际下单
                                    └──────────┘
                                         │
                                         ▼
                                   订单状态回调
                                         │
                                         ▼
                              Manager.notify_order_filled()
                                         │
                                         ▼
                               策略.on_order_filled()

回测数据流

TimescaleDB ──► BacktestEngine.load_data()
                    │
                    ▼
              历史 K 线列表
                    │
                    ▼
              BacktestEngine.run()
                    │
                    ▼
              逐根推送 K 线 ──► 策略.on_kline()
                    │                │
                    │            Signal?
                    │                │
                    ▼                ▼
              更新资金曲线    simulate_trade()
                    │
                    ▼
              计算指标 ──► BacktestResult

引擎主循环

# engine/main.py — 主程序入口示意
import asyncio
from config import settings
from common.storage import TimescaleDBReader
from manager import StrategyManager
from signals import SignalBus


async def main():
    # 1. 初始化信号总线
    signal_bus = SignalBus()

    # 2. 初始化策略管理器
    manager = StrategyManager(signal_bus)
    manager.discover_strategies("strategies")

    # 3. 创建并注册策略实例
    for cfg in settings.strategies:
        manager.create(cfg.type, cfg)

    # 4. 启动所有策略
    await manager.start_all()

    # 5. 订阅 Redis 行情
    consumer = RedisMarketConsumer(settings.redis_url)
    await consumer.subscribe_kline(manager.feed_kline)
    await consumer.subscribe_ticker(manager.feed_ticker)

    # 6. 保持运行
    await asyncio.Event().wait()


if __name__ == "__main__":
    asyncio.run(main())

与上下游模块的交互

上游:TypeScript 数据模块

引擎通过 Redis Pub/Sub 消费 TS 侧推送的行情数据:

Redis 频道 数据类型 对应策略方法
market:ticker:{exchange}:{symbol} Ticker on_ticker()
market:kline:{exchange}:{symbol}:{interval} Kline on_kline()
market:trade:{exchange}:{symbol} Trade on_trade()
market:orderbook:{exchange}:{symbol} OrderBook on_orderbook()

数据格式使用 MessagePack 序列化,Python 侧用 msgpack.unpackb() 反序列化为 Pydantic 模型。

下游:交易执行模块

引擎通过 SignalBus 发布信号,executor 订阅:

# executor 侧订阅
signal_bus.subscribe("order", executor.handle_signal)

下游:风控模块

风控模块也订阅信号总线,在 executor 执行前进行风控检查:

# risk 侧订阅
signal_bus.subscribe("order", risk_manager.check_signal)

下游:监控模块

监控模块订阅所有信号用于记录和统计:

# monitor 侧订阅
signal_bus.subscribe("*", monitor.record_signal)

目录结构

engine/
├── ENGINE.md                     # 本文档
├── __init__.py
├── env.yaml                      # 引擎环境配置
│
├── common/                       # 通用模块(基础数据类型、策略基类、日志)
│   ├── __init__.py
│   ├── base.py                   # BaseStrategy 抽象基类 + Signal / StrategyConfig
│   ├── models.py                 # 数据模型:Ticker / Kline / Trade / OrderBook
│   └── logger.py                 # 结构化日志模块
│
├── manager.py                    # StrategyManager 策略管理器
├── signals.py                    # SignalBus 信号分发器 + SignalFilter 过滤器
├── backtest.py                   # BacktestEngine 回测引擎
├── optimizer.py                  # ParamOptimizer 参数优化器
├── main.py                       # 引擎主循环入口
│
├── indicators/                   # 技术指标计算(基于 TA-Lib / pandas_ta
│   ├── __init__.py
│   ├── trend.py                  # 趋势指标 (MA, EMA, MACD, ADX...)
│   ├── momentum.py               # 动量指标 (RSI, Stochastic, CCI...)
│   ├── volatility.py             # 波动率指标 (Bollinger, ATR, Keltner...)
│   ├── volume.py                 # 成交量指标 (OBV, VWAP, MFI...)
│   └── composite.py              # 复合指标(自定义组合)
│
├── consumers/                    # 行情消费器(Redis → 策略馈送)
│   ├── __init__.py
│   ├── redis.py                  # Redis Pub/Sub 消费者
│   └── nats.py                   # NATS 消费者(可选)
│
└── tests/                        # 引擎测试
    ├── __init__.py
    ├── test_base.py
    ├── test_manager.py
    ├── test_signals.py
    ├── test_backtest.py
    ├── test_optimizer.py
    └── fixtures/                 # 测试夹具(模拟行情数据)
        └── sample_klines.json

开发步骤

第一阶段:核心框架(第 1 周)

步骤 任务 产出
1.1 实现 common/base.pyBaseStrategySignalStrategyConfig 策略基类和类型定义
1.2 实现 common/models.pyTickerKlineTradeOrderBook Pydantic 模型 与 TS 对齐的数据模型
1.3 实现 common/logger.py — 结构化日志模块 统一日志输出
1.3 实现 common/logger.py — 结构化日志模块 统一日志输出
1.4 实现 signals.pySignalBusCooldownFilterDuplicateFilter 事件总线和过滤链
1.5 实现 manager.py — 策略注册、创建、启停 策略管理器
1.6 实现 consumers/redis.py — Redis Pub/Sub 消费者 实时行情接收
1.7 编写 main.py — 引擎主循环串联 端到端可运行

第二阶段:指标与策略示例(第 2 周)

步骤 任务 产出
2.1 实现 indicators/ — MA、EMA、MACD、RSI、Bollinger 技术指标库
2.2 编写示例策略 1:双均线交叉 strategies/ma_cross.py
2.3 编写示例策略 2:网格交易 strategies/grid_trading.py
2.4 集成测试:Redis 行情 → 策略 → 信号 端到端验证

第三阶段:回测(第 3 周)

步骤 任务 产出
3.1 实现 backtest.py — 回测引擎核心 事件驱动回测
3.2 实现交易模拟逻辑(手续费、滑点) 精确回测
3.3 实现性能指标计算(夏普比率、最大回撤等) 完整指标
3.4 实现回测报告生成(JSON/HTML 可视化报告
3.5 编写回测测试用例 回测验证

第四阶段:参数优化(第 4 周)

步骤 任务 产出
4.1 实现 optimizer.py — 网格搜索 穷举搜索
4.2 集成 Optuna 实现贝叶斯优化 智能搜索
4.3 实现交叉验证 防止过拟合
4.4 实现参数重要性分析 参数洞察

实现规范

异步编程规范

  • 所有 I/O 操作使用 async/await
  • 长时间计算任务使用 asyncio.to_thread() 放入线程池
  • 避免在协程中阻塞(不用 time.sleep,用 asyncio.sleep
  • 策略的 on_* 方法必须是非阻塞的快速返回

错误处理

# 策略错误隔离模式
async def feed_kline(self, kline) -> None:
    for name, strategy in self._strategies.items():
        if not strategy.is_running:
            continue
        if strategy.config.symbol != kline.symbol:
            continue
        try:
            signal = await strategy.on_kline(kline)
            if signal:
                await self.signal_bus.emit(signal)
        except Exception as e:
            logger.error(f"Strategy {name} error: {e}", exc_info=True)
            # 单策略异常不影响其他策略

日志规范

from common.logger import logger

# 使用结构化日志
logger.info("strategy_signal", strategy="ma_cross", symbol="BTCUSDT",
            side="BUY", confidence=0.85, timestamp=1234567890)
logger.warning("strategy_error", strategy="grid", error=str(e))
logger.info("backtest_complete", symbol="ETHUSDT", sharpe=1.45, trades=230)

类型规范

  • 所有公开接口使用类型注解
  • 数据模型使用 Pydantic BaseModel
  • 策略配置继承 StrategyConfig 并扩展
  • 信号必须通过 Signal 模型创建,禁止使用裸 dict

测试规范

  • 核心逻辑(SignalBus、StrategyManager)单元测试覆盖率 > 90%
  • 策略逻辑有独立单元测试
  • 回测引擎用已知结果的静态数据集验证
  • 使用 pytest-asyncio 支持异步测试

性能基准

指标 目标值 说明
策略单次 on_kline 耗时 < 1ms 不含 I/O 的纯计算
SignalBus 发送延迟 < 0.1ms emit 到 handler 开始执行
回测速度 > 10000 bars/s 10 万根 K 线 < 10 秒
100 策略并行 CPU < 50% 空策略情况下的资源占用

附录

A. 示例策略实现(双均线交叉)

# strategies/ma_cross.py
from typing import Optional
from common import BaseStrategy, StrategyConfig, Signal


class MACrossConfig(StrategyConfig):
    fast_period: int = 7
    slow_period: int = 25


class MACrossStrategy(BaseStrategy):
    strategy_type = "ma_cross"

    def __init__(self, config: MACrossConfig):
        super().__init__(config)
        self.config: MACrossConfig = config
        self._prices: list[float] = []
        self._last_signal: Optional[str] = None  # "BUY" / "SELL"

    def _sma(self, data: list[float], period: int) -> float:
        if len(data) < period:
            return 0.0
        return sum(data[-period:]) / period

    async def on_kline(self, kline) -> Optional[Signal]:
        self._prices.append(kline.close)

        fast = self._sma(self._prices, self.config.fast_period)
        slow = self._sma(self._prices, self.config.slow_period)

        if fast == 0 or slow == 0:
            return None

        if fast > slow and self._last_signal != "BUY":
            self._last_signal = "BUY"
            return Signal(
                symbol=self.config.symbol,
                side="BUY",
                signal_type="MARKET",
                confidence=0.7,
                reason=f"Golden cross: MA{self.config.fast_period} > MA{self.config.slow_period}",
                timestamp=kline.timestamp,
            )

        if fast < slow and self._last_signal != "SELL":
            self._last_signal = "SELL"
            return Signal(
                symbol=self.config.symbol,
                side="SELL",
                signal_type="MARKET",
                confidence=0.7,
                reason=f"Death cross: MA{self.config.fast_period} < MA{self.config.slow_period}",
                timestamp=kline.timestamp,
            )

        return None

B. 引擎配置示例(engine/env.yaml

# engine/env.yaml
engine:
  name: "trade-engine"
  strategies_package: "strategies"
  max_concurrent_strategies: 50

redis:
  url: "redis://localhost:6379/0"
  channels:
    kline: "market:kline:*"
    ticker: "market:ticker:*"
    orderbook: "market:orderbook:*"

signals:
  cooldown_seconds: 1.0          # 同 symbol 下单冷却时间
  min_confidence: 0.3            # 最低信号置信度
  max_signals_per_minute: 60     # 每分钟最大信号数

backtest:
  default_commission: 0.001
  default_slippage: 0.0005
  default_capital: 10000.0

optimizer:
  default_method: "bayesian"
  default_objective: "sharpe_ratio"
  max_iterations: 200
  n_jobs: 4
  early_stopping: 20

C. 相关文档索引

文档 路径
系统总体架构 ../README.md
数据模块开发指南 ../data/ENGINE.md (待创建)
交易执行模块 ../executor/ENGINE.md (待创建)
风控模块 ../risk/ENGINE.md (待创建)
公共模块 ./common/__init__.py