Files
trade/engine
Rekey 1c9339a4db feat(engine): 新增 Python 策略引擎模块
- config/settings.py:Pydantic 解析 env.yaml
- data/db.py:asyncpg 连接池管理
- data/reader.py:KlineReader 只读查询 TimescaleDB
- data/models.py:KlineRecord 等 Pydantic 模型,镜像 TypeORM 实体
- example/test_db.py:数据库查询验证示例
- README.md:引擎架构文档
2026-06-08 18:19:50 +08:00
..

Engine Module — Python 策略引擎架构文档

模块定位:数字货币量化交易系统的业务层,负责策略开发、回测分析、信号生成,从 TimescaleDB 读取 data 模块持久化的 K 线数据进行策略决策。

运行时Python 3.10+ | 依赖管理Poetry / uv | 数据库驱动asyncpg | 配置解析PyYAML


目录

  1. 模块定位与架构边界
  2. 目录结构
  3. 配置管理 — 读取 env.yaml
  4. 数据读取 — TimescaleDB K 线查询
  5. 实体映射 — Python ↔ TypeORM 类型对齐
  6. 策略基类设计
  7. 回测引擎设计
  8. 最小可运行示例
  9. 性能考量
  10. 风险提示

1. 模块定位与架构边界

1.1 系统分层中的位置

┌──────────────────────────────────────────────────────────────┐
│                     🐍 Python Engine (本模块)                  │
│                                                               │
│  ┌──────────────┐  ┌──────────────┐  ┌───────────────────┐  │
│  │  策略引擎     │  │  回测引擎     │  │  信号分发器        │  │
│  │  (strategy/)  │  │  (backtest/) │  │  (signals/)       │  │
│  └──────┬───────┘  └──────┬───────┘  └────────┬──────────┘  │
│         │                 │                    │              │
│         └─────────────────┼────────────────────┘              │
│                           │                                   │
│  ┌────────────────────────▼──────────────────────────────┐   │
│  │              数据访问层 (data/)                         │   │
│  │  ┌─────────────┐  ┌──────────────┐  ┌──────────────┐  │   │
│  │  │ TimescaleDB │  │  env.yaml    │  │  Redis        │  │   │
│  │  │ Reader      │  │  Config      │  │  Subscriber   │  │   │
│  │  └─────────────┘  └──────────────┘  └──────────────┘  │   │
│  └───────────────────────────────────────────────────────┘   │
└──────────────────────────┬───────────────────────────────────┘
                           │
        ┌──────────────────┼──────────────────┐
        │                  │                  │
        ▼                  ▼                  ▼
┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│ TimescaleDB  │  │  env.yaml    │  │    Redis     │
│ (data 模块    │  │  (项目根目录) │  │  (行情消息)   │
│  写入的K线)   │  │              │  │              │
└──────────────┘  └──────────────┘  └──────────────┘

1.2 与 data 模块的关系

维度 data 模块 (TypeScript) engine 模块 (Python)
职责 行情采集、K 线合成、数据持久化 策略计算、回测、信号生成
数据库操作 写入UPSERT K 线到 TimescaleDB 只读SELECT K 线进行策略分析)
配置来源 data/config/index.ts 读取 env.yaml engine/config.py 读取同一份 env.yaml
实体定义 TypeORM 装饰器实体 (data/db/entities/) Pydantic/dataclass 模型(镜像 TypeORM 结构)
运行时 Bun / Node.js CPython 3.10+

设计原则engine 模块对 TimescaleDB 只读,绝不写入。K 线数据的写入由 data 模块全权负责,避免双端写入导致的数据一致性问题。


2. 目录结构

engine/
├── README.md                    # 本架构文档
├── pyproject.toml               # Python 项目依赖(Poetry
├── __init__.py
│
├── config/                      # 配置管理
│   ├── __init__.py
│   └── settings.py              # 读取 ../../env.yamlPydantic 校验
│
├── data/                        # 数据访问层(只读 TimescaleDB
│   ├── __init__.py
│   ├── db.py                    # asyncpg 连接池管理
│   ├── reader.py                # K 线查询方法封装
│   └── models.py                # Python 数据模型(镜像 TypeORM 实体)
│
├── strategy/                    # 策略实现
│   ├── __init__.py
│   ├── base.py                  # BaseStrategy 抽象基类
│   ├── ma_cross.py              # 双均线交叉策略示例
│   └── grid.py                  # 网格交易策略示例
│
├── backtest/                    # 回测引擎
│   ├── __init__.py
│   ├── engine.py                # 事件驱动回测引擎
│   └── metrics.py               # 绩效指标计算(夏普比率、最大回撤等)
│
├── signals/                     # 信号分发
│   ├── __init__.py
│   └── dispatcher.py            # 信号生成与分发
│
├── common/                      # 公共工具
│   ├── __init__.py
│   ├── logger.py                # loguru 日志配置
│   └── utils.py                 # 时间工具、重试装饰器等
│
└── tests/                       # 单元测试
    ├── __init__.py
    ├── test_config.py
    ├── test_reader.py
    └── test_strategy.py

3. 配置管理 — 读取 env.yaml

3.1 设计思路

engine 模块与 data 模块共享项目根目录的 env.yaml。Python 侧通过 PyYAML 解析,Pydantic 进行运行时校验——这与 TypeScript 侧 data/config/index.ts 使用 yaml + zod 的模式完全对称。

3.2 配置文件结构回顾

env.yaml(项目根目录,data 与 engine 共享):

# --- TimescaleDB / PostgreSQL 连接 ---
db:
  host: localhost
  port: 5432
  name: trade
  user: trader
  password: fucketh

# --- Redis 连接 ---
redis:
  url: redis://localhost:6379
  publish_enabled: true

# --- 日志 ---
logging:
  level: debug
  node_env: development

3.3 Python 实现

# engine/config/settings.py
"""中心化配置模块 —— 读取项目根目录 env.yaml,Pydantic 校验导出强类型配置对象。"""

from pathlib import Path
from typing import Literal

import yaml
from pydantic import BaseModel, Field, field_validator


# ============================================================
# 1. Pydantic 校验模型(镜像 TypeScript config/validators.ts
# ============================================================

class DbConfig(BaseModel):
    """TimescaleDB / PostgreSQL 连接参数"""
    host: str = "localhost"
    port: int = Field(default=5432, ge=1, le=65535)
    name: str                                    # 对应 YAML 中的 name
    user: str
    password: str


class RedisConfig(BaseModel):
    """Redis 连接配置"""
    url: str = "redis://localhost:6379"
    publish_enabled: bool = True


class LoggingConfig(BaseModel):
    """日志配置"""
    level: Literal["trace", "debug", "info", "warn", "error", "fatal"] = "info"
    node_env: Literal["development", "production"] = "development"


class EnvConfig(BaseModel):
    """env.yaml 顶层结构"""
    db: DbConfig
    redis: RedisConfig
    logging: LoggingConfig


# ============================================================
# 2. 定位并加载 env.yaml
# ============================================================

def _get_project_root() -> Path:
    """
    计算项目根目录的绝对路径。
    engine/config/settings.py → engine/config/ → engine/ → <project_root>/
    """
    return Path(__file__).resolve().parent.parent.parent


def _load_yaml_config() -> dict:
    """读取项目根目录 env.yaml,返回原始字典。文件不存在时抛出明确错误。"""
    root = _get_project_root()
    yaml_path = root / "env.yaml"

    if not yaml_path.exists():
        raise FileNotFoundError(
            f"[config] 无法读取配置文件: {yaml_path}\n"
            f"请确保项目根目录存在 env.yaml。"
        )

    with open(yaml_path, "r", encoding="utf-8") as f:
        data = yaml.safe_load(f)

    if data is None:
        raise ValueError(f"[config] env.yaml 解析结果为空: {yaml_path}")

    return data


# ============================================================
# 3. 加载 & 校验 & 导出
# ============================================================

_raw = _load_yaml_config()
_config = EnvConfig.model_validate(_raw)

# --- 按职责分组的导出配置对象 ---

# TimescaleDB 连接参数(可直接用于 asyncpg.create_pool
db = _config.db

# Redis 连接参数
redis = _config.redis

# 日志参数
logging = _config.logging

# --- asyncpg DSN(便捷构造)---
DB_DSN = (
    f"postgresql://{db.user}:{db.password}@{db.host}:{db.port}/{db.name}"
)


def print_config_summary() -> None:
    """打印脱敏后的配置概要(不含密码明文)"""
    print(f"[config] TimescaleDB: {db.user}@{db.host}:{db.port}/{db.name}")
    print(f"[config] Redis: {redis.url.replace('//', '//***@') if '@' in redis.url else redis.url}")
    print(f"[config] Logging: level={logging.level}, env={logging.node_env}")

3.4 与 TypeScript 侧对比

步骤 TypeScript (data/config/index.ts) Python (engine/config/settings.py)
路径计算 fileURLToPath(import.meta.url)../.. Path(__file__).resolve().parent.parent.parent
YAML 解析 parse() from yaml npm 包 yaml.safe_load() from PyYAML
类型校验 validateConfig() 使用 zod EnvConfig.model_validate() 使用 Pydantic
导出方式 分组 const 对象 (pgsql, redis, logging) 分组模块级变量 (db, redis, logging)
DSN 构造 不构造,由 TypeORM DataSource 使用拆解字段 额外导出 DB_DSN 供 asyncpg 使用

4. 数据读取 — TimescaleDB K 线查询

4.1 连接池管理

# engine/data/db.py
"""asyncpg 连接池管理 —— engine 模块对 TimescaleDB 的唯一切入点。"""

import asyncpg
from engine.config.settings import db as db_config, DB_DSN

# 模块级连接池(单例)
_pool: asyncpg.Pool | None = None


async def get_pool() -> asyncpg.Pool:
    """获取或创建 asyncpg 连接池(懒初始化,复用)。"""
    global _pool
    if _pool is None:
        _pool = await asyncpg.create_pool(
            dsn=DB_DSN,
            min_size=2,
            max_size=10,
            command_timeout=30,  # 查询超时 30 秒
            # 只读模式:在连接级别设置,防止误写
            server_settings={"default_transaction_read_only": "on"},
        )
    return _pool


async def close_pool() -> None:
    """关闭连接池(应用退出时调用)"""
    global _pool
    if _pool:
        await _pool.close()
        _pool = None

4.2 K 线读取器

# engine/data/reader.py
"""
K 线数据读取器 —— 从 TimescaleDB 查询历史 K 线供策略分析和回测使用。

所有方法均为只读查询,对应 data 模块 Kline 实体的字段结构。
参考:data/db/entities/kline.entity.ts
"""

from datetime import datetime
from typing import Optional, Sequence

import asyncpg

from engine.data.db import get_pool
from engine.data.models import KlineRecord


class KlineReader:
    """TimescaleDB K 线只读查询器"""

    def __init__(self):
        self._pool: Optional[asyncpg.Pool] = None

    async def _ensure_pool(self) -> asyncpg.Pool:
        if self._pool is None:
            self._pool = await get_pool()
        return self._pool

    # ----------------------------------------------------------
    # 核心查询方法
    # ----------------------------------------------------------

    async def get_klines(
        self,
        symbol: str,
        interval: str,
        start_time: datetime,
        end_time: datetime,
        exchange: str = "binance",
        limit: int = 1000,
    ) -> list[KlineRecord]:
        """
        查询指定时间范围内的 K 线数据。

        字段映射 — data/db/entities/kline.entity.ts:
            time          → Kline.time          (timestamptz)
            exchange      → Kline.exchange      (text)
            symbol        → Kline.symbol        (text)
            interval      → Kline.interval      (text)
            open          → Kline.open          (numeric)
            high          → Kline.high          (numeric)
            low           → Kline.low           (numeric)
            close         → Kline.close         (numeric)
            volume        → Kline.volume        (numeric)
            quote_volume  → Kline.quote_volume  (nullable)
            trade_count   → Kline.trade_count   (nullable)
            is_closed     → Kline.is_closed     (boolean)
        """
        pool = await self._ensure_pool()

        query = """
            SELECT
                time,
                exchange,
                symbol,
                interval,
                open,
                high,
                low,
                close,
                volume,
                quote_volume,
                taker_buy_base_vol,
                taker_buy_quote_vol,
                trade_count,
                is_closed
            FROM klines
            WHERE exchange = $1
              AND symbol   = $2
              AND interval = $3
              AND time    >= $4
              AND time    <= $5
            ORDER BY time ASC
            LIMIT $6
        """

        rows: Sequence[asyncpg.Record] = await pool.fetch(
            query,
            exchange,
            symbol,
            interval,
            start_time,
            end_time,
            limit,
        )

        return [KlineRecord.from_record(row) for row in rows]

    async def get_latest_klines(
        self,
        symbol: str,
        interval: str,
        exchange: str = "binance",
        limit: int = 500,
    ) -> list[KlineRecord]:
        """
        获取最近 N 根已闭合的 K 线(策略启动时快速预热)。

        仅查询 is_closed = TRUE 的 K 线,避免使用未闭合的不完整数据。
        """
        pool = await self._ensure_pool()

        query = """
            SELECT
                time, exchange, symbol, interval,
                open, high, low, close, volume,
                quote_volume, taker_buy_base_vol, taker_buy_quote_vol,
                trade_count, is_closed
            FROM klines
            WHERE exchange = $1
              AND symbol   = $2
              AND interval = $3
              AND is_closed = TRUE
            ORDER BY time DESC
            LIMIT $4
        """

        rows = await pool.fetch(query, exchange, symbol, interval, limit)

        # 正序返回(时间升序)
        records = [KlineRecord.from_record(row) for row in rows]
        records.reverse()
        return records

    async def get_ohlcv_array(
        self,
        symbol: str,
        interval: str,
        exchange: str = "binance",
        limit: int = 200,
    ) -> list[tuple[datetime, float, float, float, float, float]]:
        """
        获取 OHLCV 元组数组,直接用于 pandas DataFrame 或 TA-Lib 计算。

        返回格式: [(timestamp, open, high, low, close, volume), ...]
        时间升序排列。
        """
        pool = await self._ensure_pool()

        query = """
            SELECT time, open, high, low, close, volume
            FROM klines
            WHERE exchange = $1
              AND symbol   = $2
              AND interval = $3
            ORDER BY time DESC
            LIMIT $4
        """

        rows = await pool.fetch(query, exchange, symbol, interval, limit)

        return [
            (row["time"], row["open"], row["high"], row["low"], row["close"], row["volume"])
            for row in reversed(rows)
        ]

    async def get_available_symbols(
        self,
        exchange: str = "binance",
    ) -> list[str]:
        """查询 TimescaleDB 中有 K 线数据的交易对列表。"""
        pool = await self._ensure_pool()

        rows = await pool.fetch(
            """
            SELECT DISTINCT symbol
            FROM klines
            WHERE exchange = $1
            ORDER BY symbol
            """,
            exchange,
        )

        return [row["symbol"] for row in rows]

4.3 查询模式与索引对齐

Kline 实体的复合主键 (exchange, symbol, interval, time) 决定了最常用的查询模式。以下查询均能命中主键索引:

查询场景 SQL 条件 索引命中
回测:某交易对某周期的时间范围 WHERE exchange=$1 AND symbol=$2 AND interval=$3 AND time>=$4 AND time<=$5 ORDER BY time 主键索引前导列精确匹配
最新 K 线:已闭合的最近 N 根 WHERE exchange=$1 AND symbol=$2 AND interval=$3 AND is_closed=TRUE ORDER BY time DESC LIMIT N 主键索引 + 顺序扫描(LIMIT 小)
可用交易对枚举 SELECT DISTINCT symbol WHERE exchange=$1 ⚠️ 全表扫描 — 建议从 trading_pairs 表获取

性能建议get_available_symbols() 应优先查询 trading_pairs 表(关系数据,TypeORM 管理域),而非扫描 klines 时序表。


5. 实体映射 — Python ↔ TypeORM 类型对齐

5.1 映射策略

Python 侧的数据模型镜像 TypeScript 侧 TypeORM 实体,保证双端数据结构一致性。映射规则:

TypeScript (TypeORM Entity)  →  Python (Pydantic Model)
════════════════════════════     ════════════════════════
@PrimaryColumn() timestamptz  →  datetime
@Column("text")               →  str
@Column("numeric", 20, 8)     →  float (Python float = C double, 足够 20 位精度)
@Column("boolean")            →  bool
@Column("integer")            →  int
@Column({ nullable: true })   →  Optional[...]

5.2 Python 数据模型

# engine/data/models.py
"""
Python 数据模型 —— 镜像 data/db/entities/ 中的 TypeORM 实体。

命名约定:
  - 类名与 TypeORM 实体类名一致(Kline, Exchange, TradingPair
  - 字段名使用 Python snake_case,对应 TypeORM 的 camelCase / snake_case
  - 所有模型使用 Pydantic,提供运行时校验 + IDE 智能提示
"""

from datetime import datetime
from typing import Optional

from pydantic import BaseModel, Field


# ============================================================
# Kline 模型 — 镜像 data/db/entities/kline.entity.ts
# ============================================================

class KlineRecord(BaseModel):
    """
    K 线数据记录。

    映射关系:
        TypeORM Kline.time               → KlineRecord.time
        TypeORM Kline.exchange           → KlineRecord.exchange
        TypeORM Kline.symbol             → KlineRecord.symbol
        TypeORM Kline.interval           → KlineRecord.interval
        TypeORM Kline.open               → KlineRecord.open
        TypeORM Kline.high               → KlineRecord.high
        TypeORM Kline.low                → KlineRecord.low
        TypeORM Kline.close              → KlineRecord.close
        TypeORM Kline.volume             → KlineRecord.volume
        TypeORM Kline.quote_volume       → KlineRecord.quote_volume
        TypeORM Kline.taker_buy_base_vol → KlineRecord.taker_buy_base_vol
        TypeORM Kline.taker_buy_quote_vol→ KlineRecord.taker_buy_quote_vol
        TypeORM Kline.trade_count        → KlineRecord.trade_count
        TypeORM Kline.is_closed          → KlineRecord.is_closed
    """

    time: datetime
    exchange: str
    symbol: str
    interval: str

    # OHLCV
    open: float
    high: float
    low: float
    close: float
    volume: float

    # 扩展字段(nullable → Optional
    quote_volume: Optional[float] = None
    taker_buy_base_vol: Optional[float] = None
    taker_buy_quote_vol: Optional[float] = None
    trade_count: Optional[int] = None

    # 状态
    is_closed: bool = True

    @classmethod
    def from_record(cls, record) -> "KlineRecord":
        """
        从 asyncpg.Record 构造 KlineRecord 实例。

        asyncpg 返回的 Record 对象行为类似 dict
        数值列自动转为 Python float/inttimestamptz 自动转为 datetime。
        """
        return cls(
            time=record["time"],
            exchange=record["exchange"],
            symbol=record["symbol"],
            interval=record["interval"],
            open=float(record["open"]),
            high=float(record["high"]),
            low=float(record["low"]),
            close=float(record["close"]),
            volume=float(record["volume"]),
            quote_volume=float(record["quote_volume"]) if record["quote_volume"] is not None else None,
            taker_buy_base_vol=float(record["taker_buy_base_vol"]) if record["taker_buy_base_vol"] is not None else None,
            taker_buy_quote_vol=float(record["taker_buy_quote_vol"]) if record["taker_buy_quote_vol"] is not None else None,
            trade_count=int(record["trade_count"]) if record["trade_count"] is not None else None,
            is_closed=bool(record["is_closed"]),
        )


# ============================================================
# TradingPair 模型 — 镜像 data/db/entities/trading-pair.entity.ts
# ============================================================

class TradingPairInfo(BaseModel):
    """
    交易对配置信息(轻量版,仅包含策略决策所需的字段)。

    完整实体参考 data/db/entities/trading-pair.entity.ts
    """
    symbol: str
    exchange: str
    base_asset: str
    quote_asset: str
    price_precision: int
    quantity_precision: int
    min_qty: Optional[float] = None
    min_notional: Optional[float] = None
    active: bool = True


# ============================================================
# 策略信号模型
# ============================================================

class Signal(BaseModel):
    """
    交易信号。

    策略 generate_signal() 方法的返回值,
    由 BaseStrategy.do_action() 消费执行。
    """
    symbol: str
    signal_type: str = Field(..., pattern=r"^(BUY|SELL|HOLD)$")
    price: Optional[float] = None         # 限价单价格(None = 市价单)
    quantity: Optional[float] = None      # 下单数量(None = 使用风控模块计算的默认值)
    reason: str = ""                      # 信号产生原因(便于日志审计)
    timestamp: datetime = Field(default_factory=datetime.utcnow)

5.3 字段对应速查表

TypeORM Entity (data/db/entities/kline.entity.ts) Python Model (engine/data/models.py) DB 列类型 Python 类型
time (PrimaryColumn, timestamptz) KlineRecord.time TIMESTAMPTZ datetime
exchange (PrimaryColumn, text) KlineRecord.exchange TEXT str
symbol (PrimaryColumn, text) KlineRecord.symbol TEXT str
interval (PrimaryColumn, text) KlineRecord.interval TEXT str
open (numeric 20,8) KlineRecord.open NUMERIC(20,8) float
high (numeric 20,8) KlineRecord.high NUMERIC(20,8) float
low (numeric 20,8) KlineRecord.low NUMERIC(20,8) float
close (numeric 20,8) KlineRecord.close NUMERIC(20,8) float
volume (numeric 20,8) KlineRecord.volume NUMERIC(20,8) float
quote_volume (nullable) KlineRecord.quote_volume NUMERIC(20,8) Optional[float]
taker_buy_base_vol (nullable) KlineRecord.taker_buy_base_vol NUMERIC(20,8) Optional[float]
taker_buy_quote_vol (nullable) KlineRecord.taker_buy_quote_vol NUMERIC(20,8) Optional[float]
trade_count (nullable) KlineRecord.trade_count INTEGER Optional[int]
is_closed KlineRecord.is_closed BOOLEAN bool

6. 策略基类设计

# engine/strategy/base.py
"""策略抽象基类 —— 所有交易策略的父类。"""

from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional

from engine.data.models import KlineRecord, Signal


class BaseStrategy(ABC):
    """
    策略基类。

    子类必须实现:
      - generate_signal(): 基于 K 线数据生成交易信号
      - on_kline(): K 线到达时的回调(可在此更新内部状态)

    生命周期:
      1. __init__()        — 初始化策略参数和内状态
      2. on_kline()        — 每根新 K 线到达时调用(更新内部指标状态)
      3. generate_signal() — 被回测引擎/实盘调度器调用,生成买卖信号
    """

    def __init__(self, symbol: str, interval: str, **kwargs):
        self.symbol = symbol
        self.interval = interval
        self._klines: list[KlineRecord] = []  # 内部 K 线缓存
        self._last_signal: Optional[Signal] = None

    @abstractmethod
    async def generate_signal(self) -> Optional[Signal]:
        """
        基于当前缓存的 K 线数据生成交易信号。

        Returns:
            Signal(BUY/SELL) — 触发交易
            None             — 无信号,继续持仓/观望
        """
        ...

    async def on_kline(self, kline: KlineRecord) -> None:
        """
        新 K 线到达回调(默认实现:追加到缓存)。

        子类可重写以更新技术指标缓存(如 MA、RSI 中间计算结果)。
        """
        self._klines.append(kline)

    async def warm_up(self, klines: list[KlineRecord]) -> None:
        """
        策略预热:加载历史 K 线,初始化内部状态。

        回测引擎在开始回测前调用;实盘调度器在策略启动时调用。
        """
        self._klines = sorted(klines, key=lambda k: k.time)

    @property
    def is_ready(self) -> bool:
        """策略是否已就绪(缓存了足够的历史 K 线)"""
        return len(self._klines) >= self.min_klines_required

    @property
    @abstractmethod
    def min_klines_required(self) -> int:
        """策略正常运行所需的最少 K 线数量(如双均线策略需要 slow_period+1 根)"""
        ...

7. 回测引擎设计

# engine/backtest/engine.py
"""
事件驱动回测引擎。

核心流程:
  1. 从 TimescaleDB 加载历史 K 线
  2. 按时间顺序逐根喂给策略
  3. 收集信号 → 模拟成交 → 计算收益
  4. 输出绩效报告(夏普比率、最大回撤、胜率等)
"""

from dataclasses import dataclass, field
from datetime import datetime

from engine.data.reader import KlineReader
from engine.data.models import KlineRecord, Signal
from engine.strategy.base import BaseStrategy


@dataclass
class BacktestResult:
    """回测结果"""
    symbol: str
    interval: str
    start_time: datetime
    end_time: datetime
    total_klines: int
    total_signals: int
    total_return: float          # 总收益率
    annual_return: float         # 年化收益率
    max_drawdown: float          # 最大回撤
    sharpe_ratio: float          # 夏普比率
    win_rate: float              # 胜率
    profit_factor: float         # 盈亏比
    equity_curve: list[float] = field(default_factory=list)  # 权益曲线


class BacktestEngine:
    """回测引擎"""

    def __init__(self, reader: KlineReader):
        self.reader = reader

    async def run(
        self,
        strategy: BaseStrategy,
        symbol: str,
        interval: str,
        start_time: datetime,
        end_time: datetime,
        exchange: str = "binance",
    ) -> BacktestResult:
        """
        执行回测。

        Args:
            strategy: 待回测的策略实例
            symbol: 交易对
            interval: K 线周期
            start_time: 回测起始时间
            end_time: 回测结束时间
            exchange: 交易所
        """
        # 1. 加载历史 K 线
        klines = await self.reader.get_klines(
            symbol=symbol,
            interval=interval,
            start_time=start_time,
            end_time=end_time,
            exchange=exchange,
        )

        if len(klines) == 0:
            raise ValueError(f"回测区间内无 K 线数据: {symbol} {interval} [{start_time}, {end_time}]")

        # 2. 策略预热
        await strategy.warm_up(klines)

        # 3. 逐根 K 线回放
        signals: list[Signal] = []
        for kline in klines:
            await strategy.on_kline(kline)
            if strategy.is_ready:
                signal = await strategy.generate_signal()
                if signal is not None:
                    signals.append(signal)

        # 4. 计算绩效指标
        return self._compute_metrics(
            symbol=symbol,
            interval=interval,
            start_time=start_time,
            end_time=end_time,
            klines=klines,
            signals=signals,
        )

    def _compute_metrics(
        self,
        symbol: str,
        interval: str,
        start_time: datetime,
        end_time: datetime,
        klines: list[KlineRecord],
        signals: list[Signal],
    ) -> BacktestResult:
        """计算回测绩效指标(框架方法,实际计算委托 metrics.py"""
        from engine.backtest.metrics import compute_metrics

        return compute_metrics(
            symbol=symbol,
            interval=interval,
            start_time=start_time,
            end_time=end_time,
            klines=klines,
            signals=signals,
        )

8. 最小可运行示例

8.1 安装依赖

# engine/pyproject.toml
[tool.poetry.dependencies]
python = "^3.10"
asyncpg = "^0.29"
pydantic = "^2.5"
pyyaml = "^6.0"
loguru = "^0.7"
pandas = "^2.0"
numpy = "^1.26"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
cd engine
poetry install

8.2 运行示例

# engine/__main__.py
"""
最小可运行示例 —— 验证 TimescaleDB 连接、配置读取、K 线查询。

运行方式:
  cd engine
  poetry run python -m engine
"""

import asyncio
from datetime import datetime, timedelta, timezone

from engine.config.settings import print_config_summary, db as db_config
from engine.data.db import get_pool, close_pool
from engine.data.reader import KlineReader


async def main():
    # 1. 打印配置概要(验证 env.yaml 读取)
    print_config_summary()

    # 2. 测试 TimescaleDB 连接
    pool = await get_pool()
    async with pool.acquire() as conn:
        version = await conn.fetchval("SELECT version()")
        print(f"[db] TimescaleDB 连接成功: {version}")

    # 3. 查询最近 10 根 BTCUSDT 1h K 线
    reader = KlineReader()
    end = datetime.now(timezone.utc)
    start = end - timedelta(hours=24)

    klines = await reader.get_klines(
        symbol="BTCUSDT",
        interval="1h",
        start_time=start,
        end_time=end,
        exchange="binance",
        limit=10,
    )

    print(f"\n[data] 查询到 {len(klines)} 根 K 线:")
    for k in klines[:5]:
        print(f"  {k.time.isoformat()} | O={k.open:.2f} H={k.high:.2f} "
              f"L={k.low:.2f} C={k.close:.2f} V={k.volume:.4f}")

    # 4. 清理
    await close_pool()


if __name__ == "__main__":
    asyncio.run(main())

8.3 完整启动流程

# engine/run.py
"""策略引擎完整启动流程示例"""

import asyncio
from datetime import datetime, timedelta, timezone

from engine.config.settings import print_config_summary
from engine.data.db import get_pool, close_pool
from engine.data.reader import KlineReader
from engine.strategy.ma_cross import MACrossStrategy  # 双均线策略示例
from engine.backtest.engine import BacktestEngine


async def run_backtest():
    print_config_summary()

    reader = KlineReader()
    engine = BacktestEngine(reader)

    # 策略参数
    strategy = MACrossStrategy(
        symbol="BTCUSDT",
        interval="1h",
        fast_period=5,    # 快线周期
        slow_period=20,   # 慢线周期
    )

    # 回测区间:最近 90 天
    end = datetime.now(timezone.utc)
    start = end - timedelta(days=90)

    print(f"\n[backtest] 开始回测: {strategy.symbol} {strategy.interval} "
          f"[{start.date()} ~ {end.date()}]")

    result = await engine.run(
        strategy=strategy,
        symbol="BTCUSDT",
        interval="1h",
        start_time=start,
        end_time=end,
    )

    # 输出绩效
    print(f"\n{'='*50}")
    print(f"回测绩效报告")
    print(f"{'='*50}")
    print(f"  K 线总数:      {result.total_klines}")
    print(f"  交易信号数:    {result.total_signals}")
    print(f"  总收益率:      {result.total_return:.2%}")
    print(f"  年化收益率:    {result.annual_return:.2%}")
    print(f"  最大回撤:      {result.max_drawdown:.2%}")
    print(f"  夏普比率:      {result.sharpe_ratio:.2f}")
    print(f"  胜率:          {result.win_rate:.2%}")
    print(f"  盈亏比:        {result.profit_factor:.2f}")
    print(f"{'='*50}")

    await close_pool()


if __name__ == "__main__":
    asyncio.run(run_backtest())

9. 性能考量

9.1 查询优化

场景 优化策略 效果
大数据量回测> 10 万根 K 线) 使用 asyncpg.Cursor 流式读取,避免一次性加载到内存 内存峰值从 O(n) 降到 O(batch_size)
多币种并发查询 asyncio.gather() 并行查询多个 symbol N 个币种查询时间 ≈ max(单币种时间),而非 sum
重复区间查询 策略层缓存已查询的 K 线,避免重复 DB 查询 减少 90% 的重复 I/O
高周期 K 线 若 TimescaleDB 配置了连续聚合视图,直接查询物化视图而非原始 1m 表 查询速度提升 10-100×

9.2 内存管理

# 流式读取大数据量 K 线(避免 OOM)
async def get_klines_streaming(
    pool: asyncpg.Pool,
    symbol: str,
    interval: str,
    start_time: datetime,
    end_time: datetime,
    batch_size: int = 5000,
):
    """流式读取 K 线,每次返回 batch_size 条,适合百万级回测。"""
    async with pool.acquire() as conn:
        async with conn.transaction():
            cursor = await conn.cursor(
                """
                SELECT * FROM klines
                WHERE exchange=$1 AND symbol=$2 AND interval=$3
                  AND time>=$4 AND time<=$5
                ORDER BY time ASC
                """
            )
            # 省略参数绑定细节...
            async for row in cursor:
                yield KlineRecord.from_record(row)

9.3 向量化计算建议

策略中的技术指标计算(MA、RSI、MACD 等)若涉及全量历史数据,应使用 pandas + numpy 向量化计算,而非 Python 循环:

import pandas as pd
import numpy as np

def compute_sma_vectorized(klines: list[KlineRecord], period: int) -> np.ndarray:
    """向量化计算简单移动平均 — 比纯 Python 循环快 50-100×"""
    closes = np.array([k.close for k in klines], dtype=np.float64)
    return pd.Series(closes).rolling(window=period).mean().to_numpy()

10. 风险提示

⚠️ 风险声明:数字货币交易具有高风险,本系统仅提供技术执行工具。策略盈亏由使用者自负,开发者不做任何收益承诺。

10.1 数据相关风险

风险 说明 缓解措施
数据缺口 TimescaleDB 中 K 线可能因 data 模块采集中断而存在缺口 策略内检测 time 间隔,缺口处跳过或标记
延迟数据 网络延迟导致最新 K 线不完整 is_closed=False 的 K 线不应参与信号计算
精度损失 Python floatIEEE 754 double)约 15-17 位有效数字,对标 NUMERIC(20,8) 足够,但极值场景需使用 Decimal 价格/成交量使用 float;累积资金计算使用 Decimal

10.2 策略相关风险

风险 说明 缓解措施
过拟合 参数在回测区间表现优异但实盘失效 使用 Walk-Forward 分析,保留样本外验证集
未来信息泄露 回测中不当使用了当前时间点之后的数据 generate_signal() 仅允许访问截止当前 K 线的历史数据
滑点与手续费 回测未考虑实际交易成本 回测引擎中配置滑点模型和手续费率

10.3 跨模块一致性

检查项 验证方法
Python 模型字段与 TypeORM 实体对齐 对照 data/db/entities/kline.entity.ts 逐一核对
env.yaml 配置项双端一致 对照 data/config/index.tsEnvConfig 类型
K 线查询 SQL 的列名与实体一致 运行 __main__.py 示例验证查询返回数据

附录 A:依赖清单

# engine/pyproject.toml
[tool.poetry]
name = "trade-engine"
version = "0.1.0"
description = "数字货币量化交易系统 - Python 策略引擎"
authors = ["Trade Team"]

[tool.poetry.dependencies]
python = "^3.10"
asyncpg = "^0.29"        # 异步 PostgreSQL 驱动(TimescaleDB 兼容)
pydantic = "^2.5"        # 运行时类型校验(与 TypeScript Zod 对称)
pyyaml = "^6.0"          # 解析 env.yaml(与 TypeScript yaml 包对称)
loguru = "^0.7"          # 结构化日志
pandas = "^2.0"          # 技术指标向量化计算
numpy = "^1.26"          # 数值计算基础

[tool.poetry.group.dev.dependencies]
pytest = "^8.0"
pytest-asyncio = "^0.23"
mypy = "^1.8"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

附录 BTypeScript 实体速查

以下为 data 模块实体的关键字段摘要,供 Python 开发时快速对照。 完整定义见 data/db/entities/kline.entity.ts

实体 表名 主键 备注
Kline klines (exchange, symbol, interval, time) TimescaleDB hypertable,复合主键
Exchange exchanges id (UUID) TypeORM 管理域,继承 CommonBaseEntity
TradingPair trading_pairs id (UUID) TypeORM 管理域,外键关联 exchanges