# Engine Module — Python 策略引擎架构文档 > **模块定位**:数字货币量化交易系统的业务层,负责策略开发、回测分析、信号生成,从 TimescaleDB 读取 data 模块持久化的 K 线数据进行策略决策。 > > **运行时**:Python 3.10+ | **依赖管理**:Poetry / uv | **数据库驱动**:asyncpg | **配置解析**:PyYAML --- ## 目录 1. [模块定位与架构边界](#1-模块定位与架构边界) 2. [目录结构](#2-目录结构) 3. [配置管理 — 读取 env.yaml](#3-配置管理--读取-envyaml) 4. [数据读取 — TimescaleDB K 线查询](#4-数据读取--timescaledb-k-线查询) 5. [实体映射 — Python ↔ TypeORM 类型对齐](#5-实体映射--python--typeorm-类型对齐) 6. [策略基类设计](#6-策略基类设计) 7. [回测引擎设计](#7-回测引擎设计) 8. [最小可运行示例](#8-最小可运行示例) 9. [性能考量](#9-性能考量) 10. [风险提示](#10-风险提示) --- ## 1. 模块定位与架构边界 ### 1.1 系统分层中的位置 ``` ┌──────────────────────────────────────────────────────────────┐ │ 🐍 Python Engine (本模块) │ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌───────────────────┐ │ │ │ 策略引擎 │ │ 回测引擎 │ │ 信号分发器 │ │ │ │ (strategy/) │ │ (backtest/) │ │ (signals/) │ │ │ └──────┬───────┘ └──────┬───────┘ └────────┬──────────┘ │ │ │ │ │ │ │ └─────────────────┼────────────────────┘ │ │ │ │ │ ┌────────────────────────▼──────────────────────────────┐ │ │ │ 数据访问层 (data/) │ │ │ │ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ │ │ TimescaleDB │ │ env.yaml │ │ Redis │ │ │ │ │ │ Reader │ │ Config │ │ Subscriber │ │ │ │ │ └─────────────┘ └──────────────┘ └──────────────┘ │ │ │ └───────────────────────────────────────────────────────┘ │ └──────────────────────────┬───────────────────────────────────┘ │ ┌──────────────────┼──────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ TimescaleDB │ │ env.yaml │ │ Redis │ │ (data 模块 │ │ (项目根目录) │ │ (行情消息) │ │ 写入的K线) │ │ │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ ``` ### 1.2 与 data 模块的关系 | 维度 | data 模块 (TypeScript) | engine 模块 (Python) | |------|----------------------|---------------------| | **职责** | 行情采集、K 线合成、数据持久化 | 策略计算、回测、信号生成 | | **数据库操作** | **写入**(UPSERT K 线到 TimescaleDB) | **只读**(SELECT K 线进行策略分析) | | **配置来源** | `data/config/index.ts` 读取 `env.yaml` | `engine/config.py` 读取同一份 `env.yaml` | | **实体定义** | TypeORM 装饰器实体 (`data/db/entities/`) | Pydantic/dataclass 模型(镜像 TypeORM 结构) | | **运行时** | Bun / Node.js | CPython 3.10+ | > **设计原则**:engine 模块对 TimescaleDB **只读**,绝不写入。K 线数据的写入由 data 模块全权负责,避免双端写入导致的数据一致性问题。 --- ## 2. 目录结构 ``` engine/ ├── README.md # 本架构文档 ├── pyproject.toml # Python 项目依赖(Poetry) ├── __init__.py │ ├── config/ # 配置管理 │ ├── __init__.py │ └── settings.py # 读取 ../../env.yaml,Pydantic 校验 │ ├── data/ # 数据访问层(只读 TimescaleDB) │ ├── __init__.py │ ├── db.py # asyncpg 连接池管理 │ ├── reader.py # K 线查询方法封装 │ └── models.py # Python 数据模型(镜像 TypeORM 实体) │ ├── strategy/ # 策略实现 │ ├── __init__.py │ ├── base.py # BaseStrategy 抽象基类 │ ├── ma_cross.py # 双均线交叉策略示例 │ └── grid.py # 网格交易策略示例 │ ├── backtest/ # 回测引擎 │ ├── __init__.py │ ├── engine.py # 事件驱动回测引擎 │ └── metrics.py # 绩效指标计算(夏普比率、最大回撤等) │ ├── signals/ # 信号分发 │ ├── __init__.py │ └── dispatcher.py # 信号生成与分发 │ ├── common/ # 公共工具 │ ├── __init__.py │ ├── logger.py # loguru 日志配置 │ └── utils.py # 时间工具、重试装饰器等 │ └── tests/ # 单元测试 ├── __init__.py ├── test_config.py ├── test_reader.py └── test_strategy.py ``` --- ## 3. 配置管理 — 读取 env.yaml ### 3.1 设计思路 engine 模块与 data 模块共享项目根目录的 `env.yaml`。Python 侧通过 `PyYAML` 解析,`Pydantic` 进行运行时校验——这与 TypeScript 侧 `data/config/index.ts` 使用 `yaml` + `zod` 的模式完全对称。 ### 3.2 配置文件结构回顾 `env.yaml`(项目根目录,data 与 engine 共享): ```yaml # --- TimescaleDB / PostgreSQL 连接 --- db: host: localhost port: 5432 name: trade user: trader password: fucketh # --- Redis 连接 --- redis: url: redis://localhost:6379 publish_enabled: true # --- 日志 --- logging: level: debug node_env: development ``` ### 3.3 Python 实现 ```python # engine/config/settings.py """中心化配置模块 —— 读取项目根目录 env.yaml,Pydantic 校验导出强类型配置对象。""" from pathlib import Path from typing import Literal import yaml from pydantic import BaseModel, Field, field_validator # ============================================================ # 1. Pydantic 校验模型(镜像 TypeScript config/validators.ts) # ============================================================ class DbConfig(BaseModel): """TimescaleDB / PostgreSQL 连接参数""" host: str = "localhost" port: int = Field(default=5432, ge=1, le=65535) name: str # 对应 YAML 中的 name user: str password: str class RedisConfig(BaseModel): """Redis 连接配置""" url: str = "redis://localhost:6379" publish_enabled: bool = True class LoggingConfig(BaseModel): """日志配置""" level: Literal["trace", "debug", "info", "warn", "error", "fatal"] = "info" node_env: Literal["development", "production"] = "development" class EnvConfig(BaseModel): """env.yaml 顶层结构""" db: DbConfig redis: RedisConfig logging: LoggingConfig # ============================================================ # 2. 定位并加载 env.yaml # ============================================================ def _get_project_root() -> Path: """ 计算项目根目录的绝对路径。 engine/config/settings.py → engine/config/ → engine/ → / """ return Path(__file__).resolve().parent.parent.parent def _load_yaml_config() -> dict: """读取项目根目录 env.yaml,返回原始字典。文件不存在时抛出明确错误。""" root = _get_project_root() yaml_path = root / "env.yaml" if not yaml_path.exists(): raise FileNotFoundError( f"[config] 无法读取配置文件: {yaml_path}\n" f"请确保项目根目录存在 env.yaml。" ) with open(yaml_path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) if data is None: raise ValueError(f"[config] env.yaml 解析结果为空: {yaml_path}") return data # ============================================================ # 3. 加载 & 校验 & 导出 # ============================================================ _raw = _load_yaml_config() _config = EnvConfig.model_validate(_raw) # --- 按职责分组的导出配置对象 --- # TimescaleDB 连接参数(可直接用于 asyncpg.create_pool) db = _config.db # Redis 连接参数 redis = _config.redis # 日志参数 logging = _config.logging # --- asyncpg DSN(便捷构造)--- DB_DSN = ( f"postgresql://{db.user}:{db.password}@{db.host}:{db.port}/{db.name}" ) def print_config_summary() -> None: """打印脱敏后的配置概要(不含密码明文)""" print(f"[config] TimescaleDB: {db.user}@{db.host}:{db.port}/{db.name}") print(f"[config] Redis: {redis.url.replace('//', '//***@') if '@' in redis.url else redis.url}") print(f"[config] Logging: level={logging.level}, env={logging.node_env}") ``` ### 3.4 与 TypeScript 侧对比 | 步骤 | TypeScript (`data/config/index.ts`) | Python (`engine/config/settings.py`) | |------|--------------------------------------|--------------------------------------| | **路径计算** | `fileURLToPath(import.meta.url)` → `../..` | `Path(__file__).resolve().parent.parent.parent` | | **YAML 解析** | `parse()` from `yaml` npm 包 | `yaml.safe_load()` from PyYAML | | **类型校验** | `validateConfig()` 使用 `zod` | `EnvConfig.model_validate()` 使用 Pydantic | | **导出方式** | 分组 `const` 对象 (`pgsql`, `redis`, `logging`) | 分组模块级变量 (`db`, `redis`, `logging`) | | **DSN 构造** | 不构造,由 TypeORM DataSource 使用拆解字段 | 额外导出 `DB_DSN` 供 asyncpg 使用 | --- ## 4. 数据读取 — TimescaleDB K 线查询 ### 4.1 连接池管理 ```python # engine/data/db.py """asyncpg 连接池管理 —— engine 模块对 TimescaleDB 的唯一切入点。""" import asyncpg from engine.config.settings import db as db_config, DB_DSN # 模块级连接池(单例) _pool: asyncpg.Pool | None = None async def get_pool() -> asyncpg.Pool: """获取或创建 asyncpg 连接池(懒初始化,复用)。""" global _pool if _pool is None: _pool = await asyncpg.create_pool( dsn=DB_DSN, min_size=2, max_size=10, command_timeout=30, # 查询超时 30 秒 # 只读模式:在连接级别设置,防止误写 server_settings={"default_transaction_read_only": "on"}, ) return _pool async def close_pool() -> None: """关闭连接池(应用退出时调用)""" global _pool if _pool: await _pool.close() _pool = None ``` ### 4.2 K 线读取器 ```python # engine/data/reader.py """ K 线数据读取器 —— 从 TimescaleDB 查询历史 K 线供策略分析和回测使用。 所有方法均为只读查询,对应 data 模块 Kline 实体的字段结构。 参考:data/db/entities/kline.entity.ts """ from datetime import datetime from typing import Optional, Sequence import asyncpg from engine.data.db import get_pool from engine.data.models import KlineRecord class KlineReader: """TimescaleDB K 线只读查询器""" def __init__(self): self._pool: Optional[asyncpg.Pool] = None async def _ensure_pool(self) -> asyncpg.Pool: if self._pool is None: self._pool = await get_pool() return self._pool # ---------------------------------------------------------- # 核心查询方法 # ---------------------------------------------------------- async def get_klines( self, symbol: str, interval: str, start_time: datetime, end_time: datetime, exchange: str = "binance", limit: int = 1000, ) -> list[KlineRecord]: """ 查询指定时间范围内的 K 线数据。 字段映射 — data/db/entities/kline.entity.ts: time → Kline.time (timestamptz) exchange → Kline.exchange (text) symbol → Kline.symbol (text) interval → Kline.interval (text) open → Kline.open (numeric) high → Kline.high (numeric) low → Kline.low (numeric) close → Kline.close (numeric) volume → Kline.volume (numeric) quote_volume → Kline.quote_volume (nullable) trade_count → Kline.trade_count (nullable) is_closed → Kline.is_closed (boolean) """ pool = await self._ensure_pool() query = """ SELECT time, exchange, symbol, interval, open, high, low, close, volume, quote_volume, taker_buy_base_vol, taker_buy_quote_vol, trade_count, is_closed FROM klines WHERE exchange = $1 AND symbol = $2 AND interval = $3 AND time >= $4 AND time <= $5 ORDER BY time ASC LIMIT $6 """ rows: Sequence[asyncpg.Record] = await pool.fetch( query, exchange, symbol, interval, start_time, end_time, limit, ) return [KlineRecord.from_record(row) for row in rows] async def get_latest_klines( self, symbol: str, interval: str, exchange: str = "binance", limit: int = 500, ) -> list[KlineRecord]: """ 获取最近 N 根已闭合的 K 线(策略启动时快速预热)。 仅查询 is_closed = TRUE 的 K 线,避免使用未闭合的不完整数据。 """ pool = await self._ensure_pool() query = """ SELECT time, exchange, symbol, interval, open, high, low, close, volume, quote_volume, taker_buy_base_vol, taker_buy_quote_vol, trade_count, is_closed FROM klines WHERE exchange = $1 AND symbol = $2 AND interval = $3 AND is_closed = TRUE ORDER BY time DESC LIMIT $4 """ rows = await pool.fetch(query, exchange, symbol, interval, limit) # 正序返回(时间升序) records = [KlineRecord.from_record(row) for row in rows] records.reverse() return records async def get_ohlcv_array( self, symbol: str, interval: str, exchange: str = "binance", limit: int = 200, ) -> list[tuple[datetime, float, float, float, float, float]]: """ 获取 OHLCV 元组数组,直接用于 pandas DataFrame 或 TA-Lib 计算。 返回格式: [(timestamp, open, high, low, close, volume), ...] 时间升序排列。 """ pool = await self._ensure_pool() query = """ SELECT time, open, high, low, close, volume FROM klines WHERE exchange = $1 AND symbol = $2 AND interval = $3 ORDER BY time DESC LIMIT $4 """ rows = await pool.fetch(query, exchange, symbol, interval, limit) return [ (row["time"], row["open"], row["high"], row["low"], row["close"], row["volume"]) for row in reversed(rows) ] async def get_available_symbols( self, exchange: str = "binance", ) -> list[str]: """查询 TimescaleDB 中有 K 线数据的交易对列表。""" pool = await self._ensure_pool() rows = await pool.fetch( """ SELECT DISTINCT symbol FROM klines WHERE exchange = $1 ORDER BY symbol """, exchange, ) return [row["symbol"] for row in rows] ``` ### 4.3 查询模式与索引对齐 Kline 实体的复合主键 `(exchange, symbol, interval, time)` 决定了最常用的查询模式。以下查询均能命中主键索引: | 查询场景 | SQL 条件 | 索引命中 | |---------|---------|---------| | 回测:某交易对某周期的时间范围 | `WHERE exchange=$1 AND symbol=$2 AND interval=$3 AND time>=$4 AND time<=$5 ORDER BY time` | ✅ 主键索引前导列精确匹配 | | 最新 K 线:已闭合的最近 N 根 | `WHERE exchange=$1 AND symbol=$2 AND interval=$3 AND is_closed=TRUE ORDER BY time DESC LIMIT N` | ✅ 主键索引 + 顺序扫描(LIMIT 小) | | 可用交易对枚举 | `SELECT DISTINCT symbol WHERE exchange=$1` | ⚠️ 全表扫描 — 建议从 `trading_pairs` 表获取 | > **性能建议**:`get_available_symbols()` 应优先查询 `trading_pairs` 表(关系数据,TypeORM 管理域),而非扫描 `klines` 时序表。 --- ## 5. 实体映射 — Python ↔ TypeORM 类型对齐 ### 5.1 映射策略 Python 侧的数据模型镜像 TypeScript 侧 TypeORM 实体,保证双端数据结构一致性。映射规则: ``` TypeScript (TypeORM Entity) → Python (Pydantic Model) ════════════════════════════ ════════════════════════ @PrimaryColumn() timestamptz → datetime @Column("text") → str @Column("numeric", 20, 8) → float (Python float = C double, 足够 20 位精度) @Column("boolean") → bool @Column("integer") → int @Column({ nullable: true }) → Optional[...] ``` ### 5.2 Python 数据模型 ```python # engine/data/models.py """ Python 数据模型 —— 镜像 data/db/entities/ 中的 TypeORM 实体。 命名约定: - 类名与 TypeORM 实体类名一致(Kline, Exchange, TradingPair) - 字段名使用 Python snake_case,对应 TypeORM 的 camelCase / snake_case - 所有模型使用 Pydantic,提供运行时校验 + IDE 智能提示 """ from datetime import datetime from typing import Optional from pydantic import BaseModel, Field # ============================================================ # Kline 模型 — 镜像 data/db/entities/kline.entity.ts # ============================================================ class KlineRecord(BaseModel): """ K 线数据记录。 映射关系: TypeORM Kline.time → KlineRecord.time TypeORM Kline.exchange → KlineRecord.exchange TypeORM Kline.symbol → KlineRecord.symbol TypeORM Kline.interval → KlineRecord.interval TypeORM Kline.open → KlineRecord.open TypeORM Kline.high → KlineRecord.high TypeORM Kline.low → KlineRecord.low TypeORM Kline.close → KlineRecord.close TypeORM Kline.volume → KlineRecord.volume TypeORM Kline.quote_volume → KlineRecord.quote_volume TypeORM Kline.taker_buy_base_vol → KlineRecord.taker_buy_base_vol TypeORM Kline.taker_buy_quote_vol→ KlineRecord.taker_buy_quote_vol TypeORM Kline.trade_count → KlineRecord.trade_count TypeORM Kline.is_closed → KlineRecord.is_closed """ time: datetime exchange: str symbol: str interval: str # OHLCV open: float high: float low: float close: float volume: float # 扩展字段(nullable → Optional) quote_volume: Optional[float] = None taker_buy_base_vol: Optional[float] = None taker_buy_quote_vol: Optional[float] = None trade_count: Optional[int] = None # 状态 is_closed: bool = True @classmethod def from_record(cls, record) -> "KlineRecord": """ 从 asyncpg.Record 构造 KlineRecord 实例。 asyncpg 返回的 Record 对象行为类似 dict, 数值列自动转为 Python float/int,timestamptz 自动转为 datetime。 """ return cls( time=record["time"], exchange=record["exchange"], symbol=record["symbol"], interval=record["interval"], open=float(record["open"]), high=float(record["high"]), low=float(record["low"]), close=float(record["close"]), volume=float(record["volume"]), quote_volume=float(record["quote_volume"]) if record["quote_volume"] is not None else None, taker_buy_base_vol=float(record["taker_buy_base_vol"]) if record["taker_buy_base_vol"] is not None else None, taker_buy_quote_vol=float(record["taker_buy_quote_vol"]) if record["taker_buy_quote_vol"] is not None else None, trade_count=int(record["trade_count"]) if record["trade_count"] is not None else None, is_closed=bool(record["is_closed"]), ) # ============================================================ # TradingPair 模型 — 镜像 data/db/entities/trading-pair.entity.ts # ============================================================ class TradingPairInfo(BaseModel): """ 交易对配置信息(轻量版,仅包含策略决策所需的字段)。 完整实体参考 data/db/entities/trading-pair.entity.ts """ symbol: str exchange: str base_asset: str quote_asset: str price_precision: int quantity_precision: int min_qty: Optional[float] = None min_notional: Optional[float] = None active: bool = True # ============================================================ # 策略信号模型 # ============================================================ class Signal(BaseModel): """ 交易信号。 策略 generate_signal() 方法的返回值, 由 BaseStrategy.do_action() 消费执行。 """ symbol: str signal_type: str = Field(..., pattern=r"^(BUY|SELL|HOLD)$") price: Optional[float] = None # 限价单价格(None = 市价单) quantity: Optional[float] = None # 下单数量(None = 使用风控模块计算的默认值) reason: str = "" # 信号产生原因(便于日志审计) timestamp: datetime = Field(default_factory=datetime.utcnow) ``` ### 5.3 字段对应速查表 | TypeORM Entity (`data/db/entities/kline.entity.ts`) | Python Model (`engine/data/models.py`) | DB 列类型 | Python 类型 | |-----------------------------------------------------|----------------------------------------|----------|------------| | `time` (PrimaryColumn, timestamptz) | `KlineRecord.time` | `TIMESTAMPTZ` | `datetime` | | `exchange` (PrimaryColumn, text) | `KlineRecord.exchange` | `TEXT` | `str` | | `symbol` (PrimaryColumn, text) | `KlineRecord.symbol` | `TEXT` | `str` | | `interval` (PrimaryColumn, text) | `KlineRecord.interval` | `TEXT` | `str` | | `open` (numeric 20,8) | `KlineRecord.open` | `NUMERIC(20,8)` | `float` | | `high` (numeric 20,8) | `KlineRecord.high` | `NUMERIC(20,8)` | `float` | | `low` (numeric 20,8) | `KlineRecord.low` | `NUMERIC(20,8)` | `float` | | `close` (numeric 20,8) | `KlineRecord.close` | `NUMERIC(20,8)` | `float` | | `volume` (numeric 20,8) | `KlineRecord.volume` | `NUMERIC(20,8)` | `float` | | `quote_volume` (nullable) | `KlineRecord.quote_volume` | `NUMERIC(20,8)` | `Optional[float]` | | `taker_buy_base_vol` (nullable) | `KlineRecord.taker_buy_base_vol` | `NUMERIC(20,8)` | `Optional[float]` | | `taker_buy_quote_vol` (nullable) | `KlineRecord.taker_buy_quote_vol` | `NUMERIC(20,8)` | `Optional[float]` | | `trade_count` (nullable) | `KlineRecord.trade_count` | `INTEGER` | `Optional[int]` | | `is_closed` | `KlineRecord.is_closed` | `BOOLEAN` | `bool` | --- ## 6. 策略基类设计 ```python # engine/strategy/base.py """策略抽象基类 —— 所有交易策略的父类。""" from abc import ABC, abstractmethod from datetime import datetime from typing import Optional from engine.data.models import KlineRecord, Signal class BaseStrategy(ABC): """ 策略基类。 子类必须实现: - generate_signal(): 基于 K 线数据生成交易信号 - on_kline(): K 线到达时的回调(可在此更新内部状态) 生命周期: 1. __init__() — 初始化策略参数和内状态 2. on_kline() — 每根新 K 线到达时调用(更新内部指标状态) 3. generate_signal() — 被回测引擎/实盘调度器调用,生成买卖信号 """ def __init__(self, symbol: str, interval: str, **kwargs): self.symbol = symbol self.interval = interval self._klines: list[KlineRecord] = [] # 内部 K 线缓存 self._last_signal: Optional[Signal] = None @abstractmethod async def generate_signal(self) -> Optional[Signal]: """ 基于当前缓存的 K 线数据生成交易信号。 Returns: Signal(BUY/SELL) — 触发交易 None — 无信号,继续持仓/观望 """ ... async def on_kline(self, kline: KlineRecord) -> None: """ 新 K 线到达回调(默认实现:追加到缓存)。 子类可重写以更新技术指标缓存(如 MA、RSI 中间计算结果)。 """ self._klines.append(kline) async def warm_up(self, klines: list[KlineRecord]) -> None: """ 策略预热:加载历史 K 线,初始化内部状态。 回测引擎在开始回测前调用;实盘调度器在策略启动时调用。 """ self._klines = sorted(klines, key=lambda k: k.time) @property def is_ready(self) -> bool: """策略是否已就绪(缓存了足够的历史 K 线)""" return len(self._klines) >= self.min_klines_required @property @abstractmethod def min_klines_required(self) -> int: """策略正常运行所需的最少 K 线数量(如双均线策略需要 slow_period+1 根)""" ... ``` --- ## 7. 回测引擎设计 ```python # engine/backtest/engine.py """ 事件驱动回测引擎。 核心流程: 1. 从 TimescaleDB 加载历史 K 线 2. 按时间顺序逐根喂给策略 3. 收集信号 → 模拟成交 → 计算收益 4. 输出绩效报告(夏普比率、最大回撤、胜率等) """ from dataclasses import dataclass, field from datetime import datetime from engine.data.reader import KlineReader from engine.data.models import KlineRecord, Signal from engine.strategy.base import BaseStrategy @dataclass class BacktestResult: """回测结果""" symbol: str interval: str start_time: datetime end_time: datetime total_klines: int total_signals: int total_return: float # 总收益率 annual_return: float # 年化收益率 max_drawdown: float # 最大回撤 sharpe_ratio: float # 夏普比率 win_rate: float # 胜率 profit_factor: float # 盈亏比 equity_curve: list[float] = field(default_factory=list) # 权益曲线 class BacktestEngine: """回测引擎""" def __init__(self, reader: KlineReader): self.reader = reader async def run( self, strategy: BaseStrategy, symbol: str, interval: str, start_time: datetime, end_time: datetime, exchange: str = "binance", ) -> BacktestResult: """ 执行回测。 Args: strategy: 待回测的策略实例 symbol: 交易对 interval: K 线周期 start_time: 回测起始时间 end_time: 回测结束时间 exchange: 交易所 """ # 1. 加载历史 K 线 klines = await self.reader.get_klines( symbol=symbol, interval=interval, start_time=start_time, end_time=end_time, exchange=exchange, ) if len(klines) == 0: raise ValueError(f"回测区间内无 K 线数据: {symbol} {interval} [{start_time}, {end_time}]") # 2. 策略预热 await strategy.warm_up(klines) # 3. 逐根 K 线回放 signals: list[Signal] = [] for kline in klines: await strategy.on_kline(kline) if strategy.is_ready: signal = await strategy.generate_signal() if signal is not None: signals.append(signal) # 4. 计算绩效指标 return self._compute_metrics( symbol=symbol, interval=interval, start_time=start_time, end_time=end_time, klines=klines, signals=signals, ) def _compute_metrics( self, symbol: str, interval: str, start_time: datetime, end_time: datetime, klines: list[KlineRecord], signals: list[Signal], ) -> BacktestResult: """计算回测绩效指标(框架方法,实际计算委托 metrics.py)""" from engine.backtest.metrics import compute_metrics return compute_metrics( symbol=symbol, interval=interval, start_time=start_time, end_time=end_time, klines=klines, signals=signals, ) ``` --- ## 8. 最小可运行示例 ### 8.1 安装依赖 ```bash # engine/pyproject.toml [tool.poetry.dependencies] python = "^3.10" asyncpg = "^0.29" pydantic = "^2.5" pyyaml = "^6.0" loguru = "^0.7" pandas = "^2.0" numpy = "^1.26" [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" ``` ```bash cd engine poetry install ``` ### 8.2 运行示例 ```python # engine/__main__.py """ 最小可运行示例 —— 验证 TimescaleDB 连接、配置读取、K 线查询。 运行方式: cd engine poetry run python -m engine """ import asyncio from datetime import datetime, timedelta, timezone from engine.config.settings import print_config_summary, db as db_config from engine.data.db import get_pool, close_pool from engine.data.reader import KlineReader async def main(): # 1. 打印配置概要(验证 env.yaml 读取) print_config_summary() # 2. 测试 TimescaleDB 连接 pool = await get_pool() async with pool.acquire() as conn: version = await conn.fetchval("SELECT version()") print(f"[db] TimescaleDB 连接成功: {version}") # 3. 查询最近 10 根 BTCUSDT 1h K 线 reader = KlineReader() end = datetime.now(timezone.utc) start = end - timedelta(hours=24) klines = await reader.get_klines( symbol="BTCUSDT", interval="1h", start_time=start, end_time=end, exchange="binance", limit=10, ) print(f"\n[data] 查询到 {len(klines)} 根 K 线:") for k in klines[:5]: print(f" {k.time.isoformat()} | O={k.open:.2f} H={k.high:.2f} " f"L={k.low:.2f} C={k.close:.2f} V={k.volume:.4f}") # 4. 清理 await close_pool() if __name__ == "__main__": asyncio.run(main()) ``` ### 8.3 完整启动流程 ```python # engine/run.py """策略引擎完整启动流程示例""" import asyncio from datetime import datetime, timedelta, timezone from engine.config.settings import print_config_summary from engine.data.db import get_pool, close_pool from engine.data.reader import KlineReader from engine.strategy.ma_cross import MACrossStrategy # 双均线策略示例 from engine.backtest.engine import BacktestEngine async def run_backtest(): print_config_summary() reader = KlineReader() engine = BacktestEngine(reader) # 策略参数 strategy = MACrossStrategy( symbol="BTCUSDT", interval="1h", fast_period=5, # 快线周期 slow_period=20, # 慢线周期 ) # 回测区间:最近 90 天 end = datetime.now(timezone.utc) start = end - timedelta(days=90) print(f"\n[backtest] 开始回测: {strategy.symbol} {strategy.interval} " f"[{start.date()} ~ {end.date()}]") result = await engine.run( strategy=strategy, symbol="BTCUSDT", interval="1h", start_time=start, end_time=end, ) # 输出绩效 print(f"\n{'='*50}") print(f"回测绩效报告") print(f"{'='*50}") print(f" K 线总数: {result.total_klines}") print(f" 交易信号数: {result.total_signals}") print(f" 总收益率: {result.total_return:.2%}") print(f" 年化收益率: {result.annual_return:.2%}") print(f" 最大回撤: {result.max_drawdown:.2%}") print(f" 夏普比率: {result.sharpe_ratio:.2f}") print(f" 胜率: {result.win_rate:.2%}") print(f" 盈亏比: {result.profit_factor:.2f}") print(f"{'='*50}") await close_pool() if __name__ == "__main__": asyncio.run(run_backtest()) ``` --- ## 9. 性能考量 ### 9.1 查询优化 | 场景 | 优化策略 | 效果 | |------|---------|------| | **大数据量回测**(> 10 万根 K 线) | 使用 `asyncpg.Cursor` 流式读取,避免一次性加载到内存 | 内存峰值从 O(n) 降到 O(batch_size) | | **多币种并发查询** | `asyncio.gather()` 并行查询多个 symbol | N 个币种查询时间 ≈ max(单币种时间),而非 sum | | **重复区间查询** | 策略层缓存已查询的 K 线,避免重复 DB 查询 | 减少 90% 的重复 I/O | | **高周期 K 线** | 若 TimescaleDB 配置了连续聚合视图,直接查询物化视图而非原始 1m 表 | 查询速度提升 10-100× | ### 9.2 内存管理 ```python # 流式读取大数据量 K 线(避免 OOM) async def get_klines_streaming( pool: asyncpg.Pool, symbol: str, interval: str, start_time: datetime, end_time: datetime, batch_size: int = 5000, ): """流式读取 K 线,每次返回 batch_size 条,适合百万级回测。""" async with pool.acquire() as conn: async with conn.transaction(): cursor = await conn.cursor( """ SELECT * FROM klines WHERE exchange=$1 AND symbol=$2 AND interval=$3 AND time>=$4 AND time<=$5 ORDER BY time ASC """ ) # 省略参数绑定细节... async for row in cursor: yield KlineRecord.from_record(row) ``` ### 9.3 向量化计算建议 策略中的技术指标计算(MA、RSI、MACD 等)若涉及全量历史数据,应使用 `pandas` + `numpy` 向量化计算,而非 Python 循环: ```python import pandas as pd import numpy as np def compute_sma_vectorized(klines: list[KlineRecord], period: int) -> np.ndarray: """向量化计算简单移动平均 — 比纯 Python 循环快 50-100×""" closes = np.array([k.close for k in klines], dtype=np.float64) return pd.Series(closes).rolling(window=period).mean().to_numpy() ``` --- ## 10. 风险提示 > ⚠️ **风险声明**:数字货币交易具有高风险,本系统仅提供技术执行工具。策略盈亏由使用者自负,开发者不做任何收益承诺。 ### 10.1 数据相关风险 | 风险 | 说明 | 缓解措施 | |------|------|---------| | **数据缺口** | TimescaleDB 中 K 线可能因 data 模块采集中断而存在缺口 | 策略内检测 `time` 间隔,缺口处跳过或标记 | | **延迟数据** | 网络延迟导致最新 K 线不完整 | `is_closed=False` 的 K 线不应参与信号计算 | | **精度损失** | Python `float`(IEEE 754 double)约 15-17 位有效数字,对标 `NUMERIC(20,8)` 足够,但极值场景需使用 `Decimal` | 价格/成交量使用 `float`;累积资金计算使用 `Decimal` | ### 10.2 策略相关风险 | 风险 | 说明 | 缓解措施 | |------|------|---------| | **过拟合** | 参数在回测区间表现优异但实盘失效 | 使用 Walk-Forward 分析,保留样本外验证集 | | **未来信息泄露** | 回测中不当使用了当前时间点之后的数据 | `generate_signal()` 仅允许访问截止当前 K 线的历史数据 | | **滑点与手续费** | 回测未考虑实际交易成本 | 回测引擎中配置滑点模型和手续费率 | ### 10.3 跨模块一致性 | 检查项 | 验证方法 | |--------|---------| | Python 模型字段与 TypeORM 实体对齐 | 对照 [`data/db/entities/kline.entity.ts`](../data/db/entities/kline.entity.ts) 逐一核对 | | env.yaml 配置项双端一致 | 对照 [`data/config/index.ts`](../data/config/index.ts) 的 `EnvConfig` 类型 | | K 线查询 SQL 的列名与实体一致 | 运行 `__main__.py` 示例验证查询返回数据 | --- ## 附录 A:依赖清单 ```toml # engine/pyproject.toml [tool.poetry] name = "trade-engine" version = "0.1.0" description = "数字货币量化交易系统 - Python 策略引擎" authors = ["Trade Team"] [tool.poetry.dependencies] python = "^3.10" asyncpg = "^0.29" # 异步 PostgreSQL 驱动(TimescaleDB 兼容) pydantic = "^2.5" # 运行时类型校验(与 TypeScript Zod 对称) pyyaml = "^6.0" # 解析 env.yaml(与 TypeScript yaml 包对称) loguru = "^0.7" # 结构化日志 pandas = "^2.0" # 技术指标向量化计算 numpy = "^1.26" # 数值计算基础 [tool.poetry.group.dev.dependencies] pytest = "^8.0" pytest-asyncio = "^0.23" mypy = "^1.8" [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" ``` --- ## 附录 B:TypeScript 实体速查 > 以下为 data 模块实体的关键字段摘要,供 Python 开发时快速对照。 > 完整定义见 [`data/db/entities/kline.entity.ts`](../data/db/entities/kline.entity.ts)。 | 实体 | 表名 | 主键 | 备注 | |------|------|------|------| | `Kline` | `klines` | `(exchange, symbol, interval, time)` | TimescaleDB hypertable,复合主键 | | `Exchange` | `exchanges` | `id` (UUID) | TypeORM 管理域,继承 CommonBaseEntity | | `TradingPair` | `trading_pairs` | `id` (UUID) | TypeORM 管理域,外键关联 exchanges |