From 4d66a862349423a5e5a651a0e12ca1e88427882e Mon Sep 17 00:00:00 2001 From: Rekey Date: Fri, 12 Jun 2026 10:26:30 +0800 Subject: [PATCH] =?UTF-8?q?docs:=20=E6=B7=BB=E5=8A=A0=E9=A1=B9=E7=9B=AE?= =?UTF-8?q?=E8=A7=84=E8=8C=83=E4=B8=8E=E5=BC=95=E6=93=8E=E8=AE=BE=E8=AE=A1?= =?UTF-8?q?=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - AGENTS.md: AI Agent 规则,定义项目运行环境、架构约定、常用命令 - ENGINE.md: Python 引擎架构设计文档,涵盖策略管理、信号总线、回测引擎等模块设计 --- AGENTS.md | 66 +++ engine/ENGINE.md | 1337 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 1403 insertions(+) create mode 100644 AGENTS.md create mode 100644 engine/ENGINE.md diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..0b3eb8e --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,66 @@ +# AGENTS.md + +## 最关键 + +- **运行时是 Bun,不是 Node.js**。执行 TS 文件用 `bun run `,不能用 `node`、`npm`、`npx`。 +- **双语言项目**:`data/` 是 TypeScript (Bun),`engine/` 是 Python 3.10+。两个模块通过 Redis Pub/Sub 通信。 +- **data/ 必须在 data/ 目录下运行**:`package.json` 在 `data/` 中,依赖安装到 `data/node_modules`。命令如 `bun install`、`bun run lint` 需要 `workdir=data`。 +- **engine/ 必须在 engine/ 目录下运行**:Python 虚拟环境在 `engine/.venv/`,导入包使用相对路径(`from common import Kline`)。命令如 `python -c "from common import Kline"` 需要 `workdir=engine`。 + +## 常用命令 + +```bash +# 依赖安装(在 data/ 目录下) +bun install + +# 运行数据补全(拉取历史 K 线) +bun run data/run/exchange.ts --concurrency 2 + +# 运行连续聚合刷新(dry-run 看 SQL,加 --execute 实际执行) +bun run data/run/build_aggregates_sql.ts # 纯输出 SQL +bun run data/run/build_aggregates_sql.ts --execute # 实际刷新 +bun run data/run/build_aggregates_sql.ts --start 2025-01 --end 2025-06 --execute + +# 代码检查与格式化 +bun run lint # eslint +bun run format # prettier + +# 构建检查 +bun run build # tsc 类型检查 +``` + +## 基础设施 + +- **数据库**:`docker compose up -d` 启动 TimescaleDB(端口 5432)+ Adminer(端口 8080)。 +- **配置源**:项目根目录 `env.yaml` 是 TS 和 Python 共享的唯一配置。`data/config/index.ts` 读取并校验它。 +- **数据库连接**:host 在 `env.yaml` 中配,当前指向 `10.0.0.7`(远程)。本地开发需改为 `localhost`。 + +## 架构约定 + +- **`synchronize: false`**:TypeORM 不会自动同步 schema。修改实体后需要手动迁移或手动改表。 +- **`@timescaledb/typeorm` 是 v0.0.1 实验版**。K 线实体的 `@Hypertable` 装饰器可能不稳定。标准 SQL 集成用 TimescaleDB 连续聚合视图(`klines_5m`、`klines_15m` 等)。 +- **数据模型对齐**:TS 侧 `data/types/base.ts` 定义的类型与 Python 侧 `engine/common/models.py` 的 Pydantic 模型必须保持字段一致。TS 侧 K 线价格为 `string` 类型(精度),写库时 `Number()` 转换。 +- **K 线 4 列复合主键**:`[exchange, symbol, interval, time]`。K 线分区列是 `time`(TimescaleDB 要求分区列必须在主键中)。 + +## Python 引擎 + +- **workdir 必须是 engine/**:导入包使用 `from common import ...`(如 `from common import Kline, BaseStrategy`)。 +- **未完成模块**:策略管理器、信号总线、回测引擎、参数优化器仅存在于 `ENGINE.md` 设计文档中,尚未实现。当前仅 `common/base.py`(策略基类)和 `common/models.py`(数据模型)、`common/logger.py`(日志)可用。 +- 引擎入口 `engine/__init__.py` 导出 `Kline, KlineInterval, OrderBook, Ticker, Trade`。 +- 引擎配置在 `engine/env.yaml`(与根 `env.yaml` 不同,是引擎专属配置)。 +- Pydantic v2 的 `field_validator` 处理 TS 侧字符串 → Python float/int 转换。 + +## 项目现状 + +- **已实现**:TS 数据模块的配置加载、TypeORM 实体、Binance REST K 线拉取与批量 UPSERT、连续聚合刷新脚本。 +- **未实现**:WebSocket 行情采集、K 线合成管道、Redis 发布、策略管理器、信号总线、回测、风控、交易执行、API 网关。这些在 `README.md` 和 `engine/ENGINE.md` 中有详细设计文档。 +- **`data/run/main.ts` 不存在**,`dev` 脚本指向的文件尚未创建。当前实际可运行的入口是 `run/exchange.ts`(数据补全)和 `run/build_aggregates_sql.ts`(聚合刷新)。 +- **无测试、无 CI**:`package.json` 定义了 `vitest` 脚本但测试尚未编写。无 CI 配置文件。 + +## 注意事项 + +- **`data/exchanges/rest.ts` 包含硬编码的 Binance API Key**(第 105-106 行),不要提交到公开仓库。 +- `env.yaml` 包含明文数据库密码且被 git 追踪,注意安全。 +- 未安装 Python 依赖(如 pydantic),`engine/` 目录有独立的 `.venv/`。 +- `db/pgsql/` 在 `.gitignore` 中,这是 TimescaleDB 数据目录(Docker volume 映射)。 +- `KLINE_INTERVAL_MS` 常量定义了两处:`data/exchanges/rest.ts` 和 `data/types/kline.ts` 的类型定义。新增周期需同步。 diff --git a/engine/ENGINE.md b/engine/ENGINE.md new file mode 100644 index 0000000..03c3999 --- /dev/null +++ b/engine/ENGINE.md @@ -0,0 +1,1337 @@ +# 策略引擎开发文档 + +> 数字货币量化交易系统 — 策略引擎模块(Python 3.10+)设计与开发指南 + +--- + +## 目录 + +1. [概述与定位](#概述与定位) +2. [核心架构](#核心架构) +3. [策略基类设计](#策略基类设计) +4. [策略管理器](#策略管理器) +5. [信号分发器](#信号分发器) +6. [回测引擎](#回测引擎) +7. [参数优化器](#参数优化器) +8. [数据流与生命周期](#数据流与生命周期) +9. [与上下游模块的交互](#与上下游模块的交互) +10. [目录结构](#目录结构) +11. [开发步骤](#开发步骤) +12. [实现规范](#实现规范) + +--- + +## 概述与定位 + +策略引擎是整个量化交易系统的**核心调度中心**,位于 Python 业务层,负责: + +- **策略生命周期管理**:注册、热加载、启动、暂停、停止、卸载策略 +- **实时信号生成**:消费 TS 数据模块推送的行情数据,驱动策略产生交易信号 +- **回测验证**:使用历史数据模拟策略执行,评估收益、回撤、夏普比率等指标 +- **参数搜索**:通过网格搜索或贝叶斯优化自动寻找最优策略参数 +- **下游集成**:将交易信号通过信号分发器路由到交易执行模块和风控模块 + +### 在系统中的位置 + +``` + ┌──────────────────────────┐ + │ 策略引擎 🐍 Python │ + │ │ + Redis Pub/Sub ──────► │ ┌────────────────────┐ │ + 行情数据 │ │ 信号分发器 │──► executor/ + │ └────────────────────┘ │ + TimescaleDB ◄─────── │ ┌────────────────────┐ │ + 历史数据(回测) │ │ 策略管理器 │ │ + │ │ ┌──────┬──────┐ │ │ + │ │ │策略A │策略B │ │ │ + │ │ └──────┴──────┘ │ │ + │ └────────────────────┘ │ + │ ┌────────────────────┐ │ + │ │ 回测引擎 │ │ + │ └────────────────────┘ │ + │ ┌────────────────────┐ │ + │ │ 参数优化器 │ │ + │ └────────────────────┘ │ + └──────────────────────────┘ +``` + +--- + +## 核心架构 + +### 模块依赖图 + +``` + ┌──────────────────┐ + │ engine/common/ │ ← 配置、日志、数据模型、策略基类 + └────────┬─────────┘ + │ + ┌────────────────────┼────────────────────┐ + │ │ │ + ┌────▼────┐ ┌──────▼──────┐ ┌─────▼──────┐ + │ base.py │ │ manager.py │ │ consumers/ │ + │策略基类 │◄───────│ 策略管理器 │ │ 行情消费 │ + └────┬────┘ └──────┬──────┘ └────────────┘ + │ │ + │ ┌──────▼──────┐ + ┌────▼────┐ │ signals.py │ ┌─────────┐ + │strategies│ │ 信号分发器 │─────►│executor/│ + │ 策略实现 │ └─────────────┘ └─────────┘ + └────┬────┘ + │ + ┌────▼────┐ ┌──────────────┐ + │backtest │ │ optimizer.py │ + │回测引擎 │◄───────│ 参数优化器 │ + └─────────┘ └──────────────┘ +``` + +### 设计原则 + +| 原则 | 说明 | +|------|------| +| **插件化** | 策略以插件形式注册,新增策略无需修改引擎核心代码 | +| **异步优先** | 全部 I/O 操作使用 `asyncio`,保证高并发下的低延迟 | +| **类型安全** | 核心接口用 `ABC` + Pydantic 模型约束,运行时校验 | +| **无状态** | 引擎本身不持有策略状态,状态由策略实例自行管理 | +| **错误隔离** | 单个策略异常不影响其他策略运行,也不影响引擎主循环 | +| **可观测** | 每个生命周期事件埋点,输出结构化日志和 Prometheus 指标 | + +--- + +## 策略基类设计 + +### 抽象基类 + +```python +# engine/common/base.py +from abc import ABC, abstractmethod +from typing import Optional +from pydantic import BaseModel + +from .models import Ticker, Kline, OrderBook, Trade + + +class StrategyConfig(BaseModel): + """策略配置(由具体策略子类化扩展)""" + name: str + symbol: str + exchange: str = "binance" + enabled: bool = True + max_position_pct: float = 0.1 # 最大仓位比例 + stop_loss_pct: Optional[float] = None # 止损百分比 + take_profit_pct: Optional[float] = None # 止盈百分比 + + +class Signal(BaseModel): + """交易信号""" + symbol: str + side: str # "BUY" / "SELL" + signal_type: str # "MARKET" / "LIMIT" / "CANCEL" + price: Optional[float] = None # 限价单价格 + quantity: Optional[float] = None # 下单数量 + confidence: float = 1.0 # 信号置信度 [0, 1] + reason: str = "" # 信号生成原因(便于审计) + timestamp: float # 信号时间戳 + + +class BaseStrategy(ABC): + """所有策略的抽象基类""" + + # 策略类型标识,子类必须覆盖 + strategy_type: str = "base" + + def __init__(self, config: StrategyConfig): + self.config = config + self.is_running = False + self.pnl = 0.0 + self.trade_count = 0 + + # ── 生命周期钩子 ── + + async def on_start(self) -> None: + """策略启动时调用(加载初始状态、预热等)""" + self.is_running = True + + async def on_stop(self) -> None: + """策略停止时调用(清理、平仓等)""" + self.is_running = False + + async def on_pause(self) -> None: + """策略暂停时调用""" + pass + + async def on_resume(self) -> None: + """策略恢复时调用""" + pass + + # ── 行情事件处理器(子类按需实现)── + + @abstractmethod + async def on_kline(self, kline: Kline) -> Optional[Signal]: + """处理 K 线数据,返回交易信号""" + ... + + async def on_ticker(self, ticker: Ticker) -> Optional[Signal]: + """处理 Ticker 数据(可选实现)""" + return None + + async def on_trade(self, trade: Trade) -> Optional[Signal]: + """处理逐笔成交数据(可选实现)""" + return None + + async def on_orderbook(self, orderbook: OrderBook) -> Optional[Signal]: + """处理深度数据(可选实现)""" + return None + + # ── 交易反馈钩子 ── + + async def on_order_filled(self, order_result) -> None: + """订单成交回调""" + pass + + async def on_order_cancelled(self, order_id: str) -> None: + """订单撤销回调""" + pass + + # ── 辅助方法 ── + + async def get_klines( + self, interval: str, limit: int = 100 + ) -> list[Kline]: + """从 TimescaleDB 获取历史 K 线(回测和预热用)""" + ... + + def log(self, level: str, message: str, **kwargs) -> None: + """结构化日志""" + ... +``` + +### 策略状态机 + +``` + register + │ + ▼ + ┌────────┐ start ┌─────────┐ + │ LOADED │ ──────────► │ RUNNING │ + └────────┘ └────┬─────┘ + ▲ │ + │ ┌────────┴────────┐ + │ ▼ ▼ + │ ┌─────────┐ ┌──────────┐ + └─────────│ PAUSED │ │ STOPPING │ + unload └────┬─────┘ └────┬─────┘ + │ resume │ + └───► RUNNING │ + ▼ + ┌──────────┐ + │ STOPPED │ + └────┬─────┘ + │ unload + ▼ + (removed) +``` + +### 策略生命周期事件流 + +``` +on_start() + │ + ▼ +┌──────────────────────────────────────────────────────────┐ +│ 主循环(事件驱动) │ +│ │ +│ on_kline(kline) ──► Signal? ──► signal_bus.emit(signal) │ +│ on_ticker(ticker) ─► Signal? ──► signal_bus.emit(signal) │ +│ on_orderbook(ob) ──► Signal? ──► signal_bus.emit(signal) │ +│ │ +│ on_order_filled(result) ←── executor 回调 │ +│ on_order_cancelled(id) ←── executor 回调 │ +└──────────────────────────────────────────────────────────┘ + │ + ▼ +on_stop() +``` + +--- + +## 策略管理器 + +策略管理器是引擎的入口组件,负责所有策略实例的注册、发现、加载和生命周期调度。 + +### 接口设计 + +```python +# engine/manager.py +import importlib +import pkgutil +from pathlib import Path +from typing import Type, Optional +from .common.logger import logger + +from .common import BaseStrategy, StrategyConfig, Signal +from .signals import SignalBus + + +class StrategyManager: + """ + 策略管理器:策略注册、热加载、生命周期调度 + + 用法: + manager = StrategyManager(signal_bus=signal_bus) + manager.discover_strategies("strategies") + await manager.start_all() + """ + + def __init__(self, signal_bus: SignalBus): + self._strategies: dict[str, BaseStrategy] = {} + self._strategy_classes: dict[str, Type[BaseStrategy]] = {} + self.signal_bus = signal_bus + + # ── 策略发现与注册 ── + + def discover_strategies(self, package_path: str = "strategies") -> list[str]: + """ + 自动发现策略包下的所有策略类。 + + 约定: + - 策略文件放在 `strategies/` 目录下 + - 策略类必须继承 `BaseStrategy` 且在其模块中定义 + + 返回发现的策略类型名列表。 + """ + ... + + def register(self, strategy_cls: Type[BaseStrategy]) -> None: + """手动注册策略类""" + ... + + # ── 策略实例管理 ── + + def create( + self, strategy_type: str, config: StrategyConfig + ) -> BaseStrategy: + """ + 根据策略类型创建实例。 + + Raises: + ValueError: 策略类型未注册 + """ + ... + + async def start(self, name: str) -> None: + """启动指定策略""" + ... + + async def stop(self, name: str) -> None: + """停止指定策略""" + ... + + async def pause(self, name: str) -> None: + """暂停指定策略(保留状态,不接收行情)""" + ... + + async def resume(self, name: str) -> None: + """恢复暂停的策略""" + ... + + async def restart(self, name: str, new_config: Optional[StrategyConfig] = None) -> BaseStrategy: + """重启策略(可选地更新配置)""" + ... + + def get(self, name: str) -> Optional[BaseStrategy]: + """获取策略实例""" + ... + + def list_strategies(self) -> list[dict]: + """列出所有策略的状态""" + ... + + # ── 批量操作 ── + + async def start_all(self) -> None: + """启动所有 enabled 策略""" + ... + + async def stop_all(self) -> None: + """停止所有策略""" + ... + + # ── 行情入口(由主循环调用)── + + async def feed_kline(self, kline) -> None: + """将 K 线数据分发到对应 symbol 的策略""" + ... + + async def feed_ticker(self, ticker) -> None: + """将 Ticker 分发到对应 symbol 的策略""" + ... + + async def feed_orderbook(self, orderbook) -> None: + """将 OrderBook 分发到对应 symbol 的策略""" + ... + + # ── 交易反馈入口 ── + + async def notify_order_filled(self, strategy_name: str, order_result) -> None: + """通知策略订单已成交""" + ... + + # ── 统计与监控 ── + + def stats(self) -> dict: + """返回引擎统计信息""" + ... +``` + +### 策略热加载机制 + +``` + 策略文件变更 + │ + ▼ + ┌──────────────────┐ + │ 文件监听器 │ + │ (watchdog) │ + └────────┬─────────┘ + │ + ┌──────────▼──────────┐ + │ 重新 importlib.reload│ + └──────────┬──────────┘ + │ + ┌──────────▼──────────┐ + │ 更新 _strategy_classes│ + └──────────┬──────────┘ + │ + ┌──────────▼──────────┐ + │ 重新创建策略实例 │ + │ (stop → create → start)│ + └─────────────────────┘ +``` + +--- + +## 信号分发器 + +信号分发器是策略到交易执行模块之间的**解耦层**,提供事件总线能力。 + +### 设计目标 + +- **解耦**:策略不直接调用交易 API,只 emit 信号 +- **过滤**:同一 symbol 的冲突信号自动去重/合并 +- **优先级**:支持信号优先级排序(高置信度优先) +- **限流**:防止策略过度交易,内置冷却时间 +- **审计**:所有信号记录到数据库,可追溯 + +### 接口设计 + +```python +# engine/signals.py +import asyncio +from collections import defaultdict +from typing import Callable, Awaitable + +from .common import Signal + + +SignalHandler = Callable[[Signal], Awaitable[None]] + + +class SignalBus: + """信号事件总线""" + + def __init__(self): + self._handlers: dict[str, list[SignalHandler]] = defaultdict(list) + self._signal_history: list[Signal] = [] + self._cooldowns: dict[str, float] = {} # symbol -> 下次允许信号时间 + + def subscribe(self, event_type: str, handler: SignalHandler) -> None: + """ + 订阅信号类型。 + + event_type: "order" / "alert" / "log" 或自定义 + handler: async callable,接收 Signal 参数 + """ + ... + + async def emit(self, signal: Signal) -> None: + """ + 发布信号。会经过以下处理链: + 1. 冷却检查 + 2. 冲突检测 + 3. 优先级排序 + 4. 分发给所有订阅者 + 5. 记录到审计日志 + """ + ... + + def set_cooldown(self, symbol: str, seconds: float) -> None: + """设置 symbol 的下单冷却时间""" + ... + + def recent_signals(self, symbol: str, n: int = 10) -> list[Signal]: + """获取最近的信号历史""" + ... + + +class SignalFilter: + """信号过滤器基类(责任链模式)""" + + async def filter(self, signal: Signal) -> Signal | None: + """返回过滤后的信号,返回 None 表示丢弃""" + return signal + + +class CooldownFilter(SignalFilter): + """冷却时间过滤器""" + ... + + +class DuplicateFilter(SignalFilter): + """重复信号过滤器(同一 symbol 短期内相同方向合并)""" + ... + + +class ConfidenceFilter(SignalFilter): + """置信度过滤器(低于阈值的信号丢弃)""" + ... +``` + +### 信号流向 + +``` +策略A.on_kline() ──► Signal ──┐ +策略B.on_ticker() ─► Signal ──┤ +策略C.on_kline() ──► Signal ──┤ + ▼ + ┌────────────────┐ + │ SignalBus │ + │ │ + │ ┌──────────┐ │ + │ │ 冷却检查 │ │ + │ └────┬─────┘ │ + │ ┌────▼─────┐ │ + │ │ 去重合并 │ │ + │ └────┬─────┘ │ + │ ┌────▼─────┐ │ + │ │ 优先级排序│ │ + │ └────┬─────┘ │ + └───────┼────────┘ + │ + ┌─────────────┼─────────────┐ + ▼ ▼ ▼ + ┌──────────┐ ┌──────────┐ ┌──────────┐ + │ executor │ │ risk/ │ │ monitor/ │ + │ 交易执行 │ │ 风控 │ │ 日志 │ + └──────────┘ └──────────┘ └──────────┘ +``` + +--- + +## 回测引擎 + +回测引擎使用历史 K 线数据模拟策略执行,评估策略在历史行情中的表现。 + +### 设计目标 + +- **事件驱动**:模拟真实行情推送,按时间顺序逐根 K 线喂给策略 +- **精确模拟**:考虑手续费、滑点、最小下单量等实际交易约束 +- **多维度指标**:总收益率、年化收益、夏普比率、最大回撤、胜率、盈亏比 +- **可视化输出**:资金曲线图、回撤曲线、交易点标注 + +### 架构 + +``` + ┌──────────────────────────┐ + │ 回测引擎 │ + │ │ + 历史K线数据 ─────►│ ┌────────────────────┐ │ + (TimescaleDB) │ │ BacktestEngine │ │ + │ │ │ │ + 策略配置 ────────►│ │ - 加载历史K线 │ │ + │ │ - 逐根推送给策略 │ │ + 策略类 ──────────►│ │ - 模拟订单成交 │ │ + │ │ - 计算性能指标 │ │ + │ │ - 生成回测报告 │ │ + │ └────────────────────┘ │ + └──────────┬───────────────┘ + │ + ▼ + ┌──────────────────────────┐ + │ 回测结果 │ + │ - equity_curve │ + │ - trades │ + │ - metrics │ + │ - report (HTML/JSON) │ + └──────────────────────────┘ +``` + +### 接口设计 + +```python +# engine/backtest.py +from dataclasses import dataclass, field +from datetime import datetime +from typing import Optional, Type +from pydantic import BaseModel + +from .common import BaseStrategy, StrategyConfig + + +@dataclass +class BacktestConfig: + """回测配置""" + symbol: str + exchange: str = "binance" + interval: str = "1h" + start_time: datetime + end_time: datetime + + # 交易成本 + commission_pct: float = 0.001 # 手续费率 (0.1%) + slippage_pct: float = 0.0005 # 滑点 (0.05%) + min_order_qty: float = 0.001 # 最小下单量 + + # 资金 + initial_capital: float = 10_000.0 # 初始资金 + + # 数据 + warmup_bars: int = 100 # 预热 K 线条数 + + +@dataclass +class BacktestTrade: + """单笔回测交易记录""" + timestamp: datetime + symbol: str + side: str + price: float + quantity: float + commission: float + slippage: float + pnl: Optional[float] = None + reason: str = "" + + +@dataclass +class BacktestMetrics: + """回测性能指标""" + total_return_pct: float # 总收益率 + annual_return_pct: float # 年化收益率 + sharpe_ratio: float # 夏普比率 + sortino_ratio: float # 索提诺比率 + max_drawdown_pct: float # 最大回撤 + max_drawdown_duration: int # 最大回撤持续天数 + win_rate: float # 胜率 + profit_factor: float # 盈亏比 + total_trades: int # 总交易次数 + avg_holding_hours: float # 平均持仓时间(小时) + calmar_ratio: float # 卡尔玛比率 + + +@dataclass +class BacktestResult: + """完整回测结果""" + config: BacktestConfig + strategy_config: StrategyConfig + metrics: BacktestMetrics + trades: list[BacktestTrade] + equity_curve: list[dict] # [{timestamp, equity, drawdown}, ...] + + +class BacktestEngine: + """回测引擎""" + + def __init__(self, config: BacktestConfig): + self.config = config + self._klines: list = [] + self._trades: list[BacktestTrade] = [] + self._equity: list[dict] = [] + + async def load_data(self) -> None: + """从 TimescaleDB 加载历史 K 线数据""" + ... + + async def run( + self, + strategy_cls: Type[BaseStrategy], + strategy_config: StrategyConfig, + ) -> BacktestResult: + """ + 执行回测。 + + 流程: + 1. 预热:先喂 warmup_bars 根 K 线但不产生交易 + 2. 回测循环:逐根 K 线推送给策略 + 3. 收到 Signal 时模拟订单成交 + 4. 每根 K 线更新资金曲线 + 5. 计算最终指标 + """ + ... + + async def run_batch( + self, + strategy_cls: Type[BaseStrategy], + configs: list[StrategyConfig], + max_concurrency: int = 4, + ) -> list[BacktestResult]: + """批量回测(并行执行,用于参数扫描)""" + ... + + @staticmethod + def compute_metrics( + equity_curve: list[dict], + trades: list[BacktestTrade], + config: BacktestConfig, + ) -> BacktestMetrics: + """从资金曲线和交易记录计算性能指标""" + ... + + def generate_report(self, result: BacktestResult, format: str = "json") -> str: + """生成回测报告(json/html/markdown)""" + ... + + def plot(self, result: BacktestResult) -> None: + """绘制资金曲线和回撤图(matplotlib)""" + ... +``` + +### 回测执行流程 + +``` + 开始 + │ + ▼ + ┌─────────────────────┐ + │ 加载历史 K 线数据 │ + │ (TimescaleDB) │ + └──────────┬──────────┘ + │ + ▼ + ┌─────────────────────┐ + │ 预热阶段 │ + │ 喂入 warmup_bars │ + │ 策略初始化指标 │ + └──────────┬──────────┘ + │ + ▼ + ┌─────────────────────┐ + │ 回测主循环 │ + │ │ + │ for kline in data: │ + │ signal = strat. │ + │ on_kline(kline)│ + │ if signal: │ + │ simulate_trade │ + │ update_equity() │ + │ │ + └──────────┬──────────┘ + │ + ▼ + ┌─────────────────────┐ + │ 计算性能指标 │ + │ - 收益率 │ + │ - 夏普比率 │ + │ - 最大回撤 │ + │ - 胜率等 │ + └──────────┬──────────┘ + │ + ▼ + ┌─────────────────────┐ + │ 生成回测报告 │ + │ - JSON 结果 │ + │ - 资金曲线图 │ + │ - HTML 报告 │ + └─────────────────────┘ +``` + +### 交易模拟逻辑 + +``` +收到 Signal + │ + ▼ +┌──────────────┐ +│ 输入价格 = │ +│ kline.close │ +│ + slippage │ +└──────┬───────┘ + │ + ▼ +┌──────────────┐ +│ 检查余额 │──── 不足 ──► 跳过,记录日志 +│ 检查最小下单量 │ +└──────┬───────┘ + │ 通过 + ▼ +┌──────────────┐ +│ 执行成交 │ +│ - 更新持仓 │ +│ - 扣除手续费 │ +│ - 记录交易 │ +│ - 计算盈亏 │ +└──────────────┘ +``` + +--- + +## 参数优化器 + +参数优化器基于回测引擎,自动搜索最优策略参数组合。 + +### 支持的优化策略 + +| 方法 | 适用场景 | 特点 | +|------|---------|------| +| **网格搜索** | 参数空间小(2-3 维) | 穷举所有组合,结果完备但慢 | +| **随机搜索** | 参数空间中等 | 比网格搜索更高效 | +| **贝叶斯优化** | 参数空间大 | 利用先验知识,收敛快 | +| **遗传算法** | 复杂非线性参数空间 | 全局搜索能力强 | + +### 接口设计 + +```python +# engine/optimizer.py +from dataclasses import dataclass, field +from typing import Callable, Any + +from backtest import BacktestEngine, BacktestConfig, BacktestResult +from common import BaseStrategy, StrategyConfig + + +@dataclass +class ParamSpec: + """参数规格定义""" + name: str + type: str # "int" / "float" / "choice" + low: Optional[float] = None # 数值参数下界 + high: Optional[float] = None # 数值参数上界 + step: Optional[float] = None # 步长(网格搜索用) + choices: Optional[list[Any]] = None # 离散选择 + log_scale: bool = False # 是否对数尺度搜索 + + +@dataclass +class OptimizationConfig: + """优化器配置""" + method: str = "bayesian" # "grid" / "random" / "bayesian" / "genetic" + objective: str = "sharpe_ratio" # 优化目标指标 + max_iterations: int = 100 # 最大迭代次数 + n_jobs: int = 4 # 并行任务数 + early_stopping_rounds: int = 10 # 早停轮数 + cv_folds: int = 3 # 交叉验证折数 + + +@dataclass +class OptimizationResult: + """单次优化结果""" + params: dict[str, Any] # 最优参数 + score: float # 目标函数值 + all_trials: list[dict] # 所有试验记录 + best_result: BacktestResult # 最优参数的回测结果 + + +class ParamOptimizer: + """参数优化器""" + + def __init__( + self, + backtest_config: BacktestConfig, + strategy_cls: Type[BaseStrategy], + base_config: StrategyConfig, + param_specs: list[ParamSpec], + opt_config: OptimizationConfig = OptimizationConfig(), + ): + ... + + async def optimize(self) -> OptimizationResult: + """ + 执行参数优化。 + + 返回最优参数组合及对应的回测结果。 + """ + ... + + def suggest_params(self, n: int = 5) -> list[dict]: + """获取 top-n 参数组合建议""" + ... + + def plot_importance(self) -> None: + """绘制参数重要性图""" + ... + + def plot_parallel_coordinates(self) -> None: + """绘制参数平行坐标图""" + ... +``` + +### 优化流程 + +``` + ┌────────────────┐ + │ 定义参数空间 │ + │ ParamSpec[] │ + └───────┬────────┘ + │ + ▼ + ┌────────────────┐ + │ 选择优化算法 │ + │ (网格/贝叶斯等) │ + └───────┬────────┘ + │ + ┌────────▼────────┐ + │ 优化主循环 │ + │ │ + │ while n < max: │ + │ params = │ + │ suggest() │ + │ result = │ + │ backtest() │ + │ score = │ + │ metric() │ + │ update() │ + │ check_early()│ + │ │ + └────────┬────────┘ + │ + ▼ + ┌────────────────┐ + │ 返回最优结果 │ + │ + 所有试验记录 │ + └────────────────┘ +``` + +### 目标函数 + +``` +目标函数可选指标(最小化回撤时取负值): + +sharpe_ratio — 夏普比率(风险调整后收益,推荐) +sortino_ratio — 索提诺比率(只考虑下行波动) +total_return — 总收益率 +calmar_ratio — 卡尔玛比率(收益/最大回撤) +profit_factor — 盈亏比 +custom — 自定义组合指标 +``` + +--- + +## 数据流与生命周期 + +### 实时交易数据流 + +``` +Redis Pub/Sub engine/ executor/ +───────────── ───────── ───────── + +market:kline:* ──► RedisConsumer ──► Manager.feed_kline() + │ + ▼ + 路由到对应 symbol 策略 + │ + ┌─────┴─────┐ + ▼ ▼ + 策略A.on_kline 策略B.on_kline + │ │ + Signal? Signal? + │ │ + └─────┬─────┘ + ▼ + SignalBus.emit() + │ + ┌───────────┼───────────┐ + ▼ ▼ ▼ + 冷却检查 去重合并 优先级排序 + │ │ │ + └───────────┼───────────┘ + ▼ + ┌──────────┐ + │ risk/ │ ← 风控检查 + │ manager │ + └────┬─────┘ + │ passed + ▼ + ┌──────────┐ + │ executor │ ← 实际下单 + └──────────┘ + │ + ▼ + 订单状态回调 + │ + ▼ + Manager.notify_order_filled() + │ + ▼ + 策略.on_order_filled() +``` + +### 回测数据流 + +``` +TimescaleDB ──► BacktestEngine.load_data() + │ + ▼ + 历史 K 线列表 + │ + ▼ + BacktestEngine.run() + │ + ▼ + 逐根推送 K 线 ──► 策略.on_kline() + │ │ + │ Signal? + │ │ + ▼ ▼ + 更新资金曲线 simulate_trade() + │ + ▼ + 计算指标 ──► BacktestResult +``` + +### 引擎主循环 + +```python +# engine/main.py — 主程序入口示意 +import asyncio +from config import settings +from common.storage import TimescaleDBReader +from manager import StrategyManager +from signals import SignalBus + + +async def main(): + # 1. 初始化信号总线 + signal_bus = SignalBus() + + # 2. 初始化策略管理器 + manager = StrategyManager(signal_bus) + manager.discover_strategies("strategies") + + # 3. 创建并注册策略实例 + for cfg in settings.strategies: + manager.create(cfg.type, cfg) + + # 4. 启动所有策略 + await manager.start_all() + + # 5. 订阅 Redis 行情 + consumer = RedisMarketConsumer(settings.redis_url) + await consumer.subscribe_kline(manager.feed_kline) + await consumer.subscribe_ticker(manager.feed_ticker) + + # 6. 保持运行 + await asyncio.Event().wait() + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +--- + +## 与上下游模块的交互 + +### 上游:TypeScript 数据模块 + +引擎通过 Redis Pub/Sub 消费 TS 侧推送的行情数据: + +| Redis 频道 | 数据类型 | 对应策略方法 | +|-----------|---------|-------------| +| `market:ticker:{exchange}:{symbol}` | Ticker | `on_ticker()` | +| `market:kline:{exchange}:{symbol}:{interval}` | Kline | `on_kline()` | +| `market:trade:{exchange}:{symbol}` | Trade | `on_trade()` | +| `market:orderbook:{exchange}:{symbol}` | OrderBook | `on_orderbook()` | + +数据格式使用 **MessagePack** 序列化,Python 侧用 `msgpack.unpackb()` 反序列化为 Pydantic 模型。 + +### 下游:交易执行模块 + +引擎通过 `SignalBus` 发布信号,executor 订阅: + +```python +# executor 侧订阅 +signal_bus.subscribe("order", executor.handle_signal) +``` + +### 下游:风控模块 + +风控模块也订阅信号总线,在 executor 执行前进行风控检查: + +```python +# risk 侧订阅 +signal_bus.subscribe("order", risk_manager.check_signal) +``` + +### 下游:监控模块 + +监控模块订阅所有信号用于记录和统计: + +```python +# monitor 侧订阅 +signal_bus.subscribe("*", monitor.record_signal) +``` + +--- + +## 目录结构 + +``` +engine/ +├── ENGINE.md # 本文档 +├── __init__.py +├── env.yaml # 引擎环境配置 +│ +├── common/ # 通用模块(基础数据类型、策略基类、日志) +│ ├── __init__.py +│ ├── base.py # BaseStrategy 抽象基类 + Signal / StrategyConfig +│ ├── models.py # 数据模型:Ticker / Kline / Trade / OrderBook +│ └── logger.py # 结构化日志模块 +│ +├── manager.py # StrategyManager 策略管理器 +├── signals.py # SignalBus 信号分发器 + SignalFilter 过滤器 +├── backtest.py # BacktestEngine 回测引擎 +├── optimizer.py # ParamOptimizer 参数优化器 +├── main.py # 引擎主循环入口 +│ +├── indicators/ # 技术指标计算(基于 TA-Lib / pandas_ta) +│ ├── __init__.py +│ ├── trend.py # 趋势指标 (MA, EMA, MACD, ADX...) +│ ├── momentum.py # 动量指标 (RSI, Stochastic, CCI...) +│ ├── volatility.py # 波动率指标 (Bollinger, ATR, Keltner...) +│ ├── volume.py # 成交量指标 (OBV, VWAP, MFI...) +│ └── composite.py # 复合指标(自定义组合) +│ +├── consumers/ # 行情消费器(Redis → 策略馈送) +│ ├── __init__.py +│ ├── redis.py # Redis Pub/Sub 消费者 +│ └── nats.py # NATS 消费者(可选) +│ +└── tests/ # 引擎测试 + ├── __init__.py + ├── test_base.py + ├── test_manager.py + ├── test_signals.py + ├── test_backtest.py + ├── test_optimizer.py + └── fixtures/ # 测试夹具(模拟行情数据) + └── sample_klines.json +``` + +--- + +## 开发步骤 + +### 第一阶段:核心框架(第 1 周) + +| 步骤 | 任务 | 产出 | +|------|------|------| +| 1.1 | 实现 `common/base.py` — `BaseStrategy`、`Signal`、`StrategyConfig` | 策略基类和类型定义 | +| 1.2 | 实现 `common/models.py` — `Ticker`、`Kline`、`Trade`、`OrderBook` Pydantic 模型 | 与 TS 对齐的数据模型 | +| 1.3 | 实现 `common/logger.py` — 结构化日志模块 | 统一日志输出 | +| 1.3 | 实现 `common/logger.py` — 结构化日志模块 | 统一日志输出 | +| 1.4 | 实现 `signals.py` — `SignalBus`、`CooldownFilter`、`DuplicateFilter` | 事件总线和过滤链 | +| 1.5 | 实现 `manager.py` — 策略注册、创建、启停 | 策略管理器 | +| 1.6 | 实现 `consumers/redis.py` — Redis Pub/Sub 消费者 | 实时行情接收 | +| 1.7 | 编写 `main.py` — 引擎主循环串联 | 端到端可运行 | + +### 第二阶段:指标与策略示例(第 2 周) + +| 步骤 | 任务 | 产出 | +|------|------|------| +| 2.1 | 实现 `indicators/` — MA、EMA、MACD、RSI、Bollinger | 技术指标库 | +| 2.2 | 编写示例策略 1:双均线交叉 | `strategies/ma_cross.py` | +| 2.3 | 编写示例策略 2:网格交易 | `strategies/grid_trading.py` | +| 2.4 | 集成测试:Redis 行情 → 策略 → 信号 | 端到端验证 | + +### 第三阶段:回测(第 3 周) + +| 步骤 | 任务 | 产出 | +|------|------|------| +| 3.1 | 实现 `backtest.py` — 回测引擎核心 | 事件驱动回测 | +| 3.2 | 实现交易模拟逻辑(手续费、滑点) | 精确回测 | +| 3.3 | 实现性能指标计算(夏普比率、最大回撤等) | 完整指标 | +| 3.4 | 实现回测报告生成(JSON/HTML) | 可视化报告 | +| 3.5 | 编写回测测试用例 | 回测验证 | + +### 第四阶段:参数优化(第 4 周) + +| 步骤 | 任务 | 产出 | +|------|------|------| +| 4.1 | 实现 `optimizer.py` — 网格搜索 | 穷举搜索 | +| 4.2 | 集成 Optuna 实现贝叶斯优化 | 智能搜索 | +| 4.3 | 实现交叉验证 | 防止过拟合 | +| 4.4 | 实现参数重要性分析 | 参数洞察 | + +--- + +## 实现规范 + +### 异步编程规范 + +- 所有 I/O 操作使用 `async/await` +- 长时间计算任务使用 `asyncio.to_thread()` 放入线程池 +- 避免在协程中阻塞(不用 `time.sleep`,用 `asyncio.sleep`) +- 策略的 `on_*` 方法必须是非阻塞的快速返回 + +### 错误处理 + +```python +# 策略错误隔离模式 +async def feed_kline(self, kline) -> None: + for name, strategy in self._strategies.items(): + if not strategy.is_running: + continue + if strategy.config.symbol != kline.symbol: + continue + try: + signal = await strategy.on_kline(kline) + if signal: + await self.signal_bus.emit(signal) + except Exception as e: + logger.error(f"Strategy {name} error: {e}", exc_info=True) + # 单策略异常不影响其他策略 +``` + +### 日志规范 + +```python +from common.logger import logger + +# 使用结构化日志 +logger.info("strategy_signal", strategy="ma_cross", symbol="BTCUSDT", + side="BUY", confidence=0.85, timestamp=1234567890) +logger.warning("strategy_error", strategy="grid", error=str(e)) +logger.info("backtest_complete", symbol="ETHUSDT", sharpe=1.45, trades=230) +``` + +### 类型规范 + +- 所有公开接口使用类型注解 +- 数据模型使用 Pydantic `BaseModel` +- 策略配置继承 `StrategyConfig` 并扩展 +- 信号必须通过 `Signal` 模型创建,禁止使用裸 dict + +### 测试规范 + +- 核心逻辑(SignalBus、StrategyManager)单元测试覆盖率 > 90% +- 策略逻辑有独立单元测试 +- 回测引擎用已知结果的静态数据集验证 +- 使用 `pytest-asyncio` 支持异步测试 + +### 性能基准 + +| 指标 | 目标值 | 说明 | +|------|--------|------| +| 策略单次 `on_kline` 耗时 | < 1ms | 不含 I/O 的纯计算 | +| SignalBus 发送延迟 | < 0.1ms | emit 到 handler 开始执行 | +| 回测速度 | > 10000 bars/s | 10 万根 K 线 < 10 秒 | +| 100 策略并行 | CPU < 50% | 空策略情况下的资源占用 | + +--- + +## 附录 + +### A. 示例策略实现(双均线交叉) + +```python +# strategies/ma_cross.py +from typing import Optional +from common import BaseStrategy, StrategyConfig, Signal + + +class MACrossConfig(StrategyConfig): + fast_period: int = 7 + slow_period: int = 25 + + +class MACrossStrategy(BaseStrategy): + strategy_type = "ma_cross" + + def __init__(self, config: MACrossConfig): + super().__init__(config) + self.config: MACrossConfig = config + self._prices: list[float] = [] + self._last_signal: Optional[str] = None # "BUY" / "SELL" + + def _sma(self, data: list[float], period: int) -> float: + if len(data) < period: + return 0.0 + return sum(data[-period:]) / period + + async def on_kline(self, kline) -> Optional[Signal]: + self._prices.append(kline.close) + + fast = self._sma(self._prices, self.config.fast_period) + slow = self._sma(self._prices, self.config.slow_period) + + if fast == 0 or slow == 0: + return None + + if fast > slow and self._last_signal != "BUY": + self._last_signal = "BUY" + return Signal( + symbol=self.config.symbol, + side="BUY", + signal_type="MARKET", + confidence=0.7, + reason=f"Golden cross: MA{self.config.fast_period} > MA{self.config.slow_period}", + timestamp=kline.timestamp, + ) + + if fast < slow and self._last_signal != "SELL": + self._last_signal = "SELL" + return Signal( + symbol=self.config.symbol, + side="SELL", + signal_type="MARKET", + confidence=0.7, + reason=f"Death cross: MA{self.config.fast_period} < MA{self.config.slow_period}", + timestamp=kline.timestamp, + ) + + return None +``` + +### B. 引擎配置示例(engine/env.yaml) + +```yaml +# engine/env.yaml +engine: + name: "trade-engine" + strategies_package: "strategies" + max_concurrent_strategies: 50 + +redis: + url: "redis://localhost:6379/0" + channels: + kline: "market:kline:*" + ticker: "market:ticker:*" + orderbook: "market:orderbook:*" + +signals: + cooldown_seconds: 1.0 # 同 symbol 下单冷却时间 + min_confidence: 0.3 # 最低信号置信度 + max_signals_per_minute: 60 # 每分钟最大信号数 + +backtest: + default_commission: 0.001 + default_slippage: 0.0005 + default_capital: 10000.0 + +optimizer: + default_method: "bayesian" + default_objective: "sharpe_ratio" + max_iterations: 200 + n_jobs: 4 + early_stopping: 20 +``` + +### C. 相关文档索引 + +| 文档 | 路径 | +|------|------| +| 系统总体架构 | `../README.md` | +| 数据模块开发指南 | `../data/ENGINE.md` (待创建) | +| 交易执行模块 | `../executor/ENGINE.md` (待创建) | +| 风控模块 | `../risk/ENGINE.md` (待创建) | +| 公共模块 | `./common/__init__.py` |