- config/settings.py:Pydantic 解析 env.yaml - data/db.py:asyncpg 连接池管理 - data/reader.py:KlineReader 只读查询 TimescaleDB - data/models.py:KlineRecord 等 Pydantic 模型,镜像 TypeORM 实体 - example/test_db.py:数据库查询验证示例 - README.md:引擎架构文档
39 KiB
Engine Module — Python 策略引擎架构文档
模块定位:数字货币量化交易系统的业务层,负责策略开发、回测分析、信号生成,从 TimescaleDB 读取 data 模块持久化的 K 线数据进行策略决策。
运行时:Python 3.10+ | 依赖管理:Poetry / uv | 数据库驱动:asyncpg | 配置解析:PyYAML
目录
- 模块定位与架构边界
- 目录结构
- 配置管理 — 读取 env.yaml
- 数据读取 — TimescaleDB K 线查询
- 实体映射 — Python ↔ TypeORM 类型对齐
- 策略基类设计
- 回测引擎设计
- 最小可运行示例
- 性能考量
- 风险提示
1. 模块定位与架构边界
1.1 系统分层中的位置
┌──────────────────────────────────────────────────────────────┐
│ 🐍 Python Engine (本模块) │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌───────────────────┐ │
│ │ 策略引擎 │ │ 回测引擎 │ │ 信号分发器 │ │
│ │ (strategy/) │ │ (backtest/) │ │ (signals/) │ │
│ └──────┬───────┘ └──────┬───────┘ └────────┬──────────┘ │
│ │ │ │ │
│ └─────────────────┼────────────────────┘ │
│ │ │
│ ┌────────────────────────▼──────────────────────────────┐ │
│ │ 数据访问层 (data/) │ │
│ │ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ TimescaleDB │ │ env.yaml │ │ Redis │ │ │
│ │ │ Reader │ │ Config │ │ Subscriber │ │ │
│ │ └─────────────┘ └──────────────┘ └──────────────┘ │ │
│ └───────────────────────────────────────────────────────┘ │
└──────────────────────────┬───────────────────────────────────┘
│
┌──────────────────┼──────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ TimescaleDB │ │ env.yaml │ │ Redis │
│ (data 模块 │ │ (项目根目录) │ │ (行情消息) │
│ 写入的K线) │ │ │ │ │
└──────────────┘ └──────────────┘ └──────────────┘
1.2 与 data 模块的关系
| 维度 | data 模块 (TypeScript) | engine 模块 (Python) |
|---|---|---|
| 职责 | 行情采集、K 线合成、数据持久化 | 策略计算、回测、信号生成 |
| 数据库操作 | 写入(UPSERT K 线到 TimescaleDB) | 只读(SELECT K 线进行策略分析) |
| 配置来源 | data/config/index.ts 读取 env.yaml |
engine/config.py 读取同一份 env.yaml |
| 实体定义 | TypeORM 装饰器实体 (data/db/entities/) |
Pydantic/dataclass 模型(镜像 TypeORM 结构) |
| 运行时 | Bun / Node.js | CPython 3.10+ |
设计原则:engine 模块对 TimescaleDB 只读,绝不写入。K 线数据的写入由 data 模块全权负责,避免双端写入导致的数据一致性问题。
2. 目录结构
engine/
├── README.md # 本架构文档
├── pyproject.toml # Python 项目依赖(Poetry)
├── __init__.py
│
├── config/ # 配置管理
│ ├── __init__.py
│ └── settings.py # 读取 ../../env.yaml,Pydantic 校验
│
├── data/ # 数据访问层(只读 TimescaleDB)
│ ├── __init__.py
│ ├── db.py # asyncpg 连接池管理
│ ├── reader.py # K 线查询方法封装
│ └── models.py # Python 数据模型(镜像 TypeORM 实体)
│
├── strategy/ # 策略实现
│ ├── __init__.py
│ ├── base.py # BaseStrategy 抽象基类
│ ├── ma_cross.py # 双均线交叉策略示例
│ └── grid.py # 网格交易策略示例
│
├── backtest/ # 回测引擎
│ ├── __init__.py
│ ├── engine.py # 事件驱动回测引擎
│ └── metrics.py # 绩效指标计算(夏普比率、最大回撤等)
│
├── signals/ # 信号分发
│ ├── __init__.py
│ └── dispatcher.py # 信号生成与分发
│
├── common/ # 公共工具
│ ├── __init__.py
│ ├── logger.py # loguru 日志配置
│ └── utils.py # 时间工具、重试装饰器等
│
└── tests/ # 单元测试
├── __init__.py
├── test_config.py
├── test_reader.py
└── test_strategy.py
3. 配置管理 — 读取 env.yaml
3.1 设计思路
engine 模块与 data 模块共享项目根目录的 env.yaml。Python 侧通过 PyYAML 解析,Pydantic 进行运行时校验——这与 TypeScript 侧 data/config/index.ts 使用 yaml + zod 的模式完全对称。
3.2 配置文件结构回顾
env.yaml(项目根目录,data 与 engine 共享):
# --- TimescaleDB / PostgreSQL 连接 ---
db:
host: localhost
port: 5432
name: trade
user: trader
password: fucketh
# --- Redis 连接 ---
redis:
url: redis://localhost:6379
publish_enabled: true
# --- 日志 ---
logging:
level: debug
node_env: development
3.3 Python 实现
# engine/config/settings.py
"""中心化配置模块 —— 读取项目根目录 env.yaml,Pydantic 校验导出强类型配置对象。"""
from pathlib import Path
from typing import Literal
import yaml
from pydantic import BaseModel, Field, field_validator
# ============================================================
# 1. Pydantic 校验模型(镜像 TypeScript config/validators.ts)
# ============================================================
class DbConfig(BaseModel):
"""TimescaleDB / PostgreSQL 连接参数"""
host: str = "localhost"
port: int = Field(default=5432, ge=1, le=65535)
name: str # 对应 YAML 中的 name
user: str
password: str
class RedisConfig(BaseModel):
"""Redis 连接配置"""
url: str = "redis://localhost:6379"
publish_enabled: bool = True
class LoggingConfig(BaseModel):
"""日志配置"""
level: Literal["trace", "debug", "info", "warn", "error", "fatal"] = "info"
node_env: Literal["development", "production"] = "development"
class EnvConfig(BaseModel):
"""env.yaml 顶层结构"""
db: DbConfig
redis: RedisConfig
logging: LoggingConfig
# ============================================================
# 2. 定位并加载 env.yaml
# ============================================================
def _get_project_root() -> Path:
"""
计算项目根目录的绝对路径。
engine/config/settings.py → engine/config/ → engine/ → <project_root>/
"""
return Path(__file__).resolve().parent.parent.parent
def _load_yaml_config() -> dict:
"""读取项目根目录 env.yaml,返回原始字典。文件不存在时抛出明确错误。"""
root = _get_project_root()
yaml_path = root / "env.yaml"
if not yaml_path.exists():
raise FileNotFoundError(
f"[config] 无法读取配置文件: {yaml_path}\n"
f"请确保项目根目录存在 env.yaml。"
)
with open(yaml_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if data is None:
raise ValueError(f"[config] env.yaml 解析结果为空: {yaml_path}")
return data
# ============================================================
# 3. 加载 & 校验 & 导出
# ============================================================
_raw = _load_yaml_config()
_config = EnvConfig.model_validate(_raw)
# --- 按职责分组的导出配置对象 ---
# TimescaleDB 连接参数(可直接用于 asyncpg.create_pool)
db = _config.db
# Redis 连接参数
redis = _config.redis
# 日志参数
logging = _config.logging
# --- asyncpg DSN(便捷构造)---
DB_DSN = (
f"postgresql://{db.user}:{db.password}@{db.host}:{db.port}/{db.name}"
)
def print_config_summary() -> None:
"""打印脱敏后的配置概要(不含密码明文)"""
print(f"[config] TimescaleDB: {db.user}@{db.host}:{db.port}/{db.name}")
print(f"[config] Redis: {redis.url.replace('//', '//***@') if '@' in redis.url else redis.url}")
print(f"[config] Logging: level={logging.level}, env={logging.node_env}")
3.4 与 TypeScript 侧对比
| 步骤 | TypeScript (data/config/index.ts) |
Python (engine/config/settings.py) |
|---|---|---|
| 路径计算 | fileURLToPath(import.meta.url) → ../.. |
Path(__file__).resolve().parent.parent.parent |
| YAML 解析 | parse() from yaml npm 包 |
yaml.safe_load() from PyYAML |
| 类型校验 | validateConfig() 使用 zod |
EnvConfig.model_validate() 使用 Pydantic |
| 导出方式 | 分组 const 对象 (pgsql, redis, logging) |
分组模块级变量 (db, redis, logging) |
| DSN 构造 | 不构造,由 TypeORM DataSource 使用拆解字段 | 额外导出 DB_DSN 供 asyncpg 使用 |
4. 数据读取 — TimescaleDB K 线查询
4.1 连接池管理
# engine/data/db.py
"""asyncpg 连接池管理 —— engine 模块对 TimescaleDB 的唯一切入点。"""
import asyncpg
from engine.config.settings import db as db_config, DB_DSN
# 模块级连接池(单例)
_pool: asyncpg.Pool | None = None
async def get_pool() -> asyncpg.Pool:
"""获取或创建 asyncpg 连接池(懒初始化,复用)。"""
global _pool
if _pool is None:
_pool = await asyncpg.create_pool(
dsn=DB_DSN,
min_size=2,
max_size=10,
command_timeout=30, # 查询超时 30 秒
# 只读模式:在连接级别设置,防止误写
server_settings={"default_transaction_read_only": "on"},
)
return _pool
async def close_pool() -> None:
"""关闭连接池(应用退出时调用)"""
global _pool
if _pool:
await _pool.close()
_pool = None
4.2 K 线读取器
# engine/data/reader.py
"""
K 线数据读取器 —— 从 TimescaleDB 查询历史 K 线供策略分析和回测使用。
所有方法均为只读查询,对应 data 模块 Kline 实体的字段结构。
参考:data/db/entities/kline.entity.ts
"""
from datetime import datetime
from typing import Optional, Sequence
import asyncpg
from engine.data.db import get_pool
from engine.data.models import KlineRecord
class KlineReader:
"""TimescaleDB K 线只读查询器"""
def __init__(self):
self._pool: Optional[asyncpg.Pool] = None
async def _ensure_pool(self) -> asyncpg.Pool:
if self._pool is None:
self._pool = await get_pool()
return self._pool
# ----------------------------------------------------------
# 核心查询方法
# ----------------------------------------------------------
async def get_klines(
self,
symbol: str,
interval: str,
start_time: datetime,
end_time: datetime,
exchange: str = "binance",
limit: int = 1000,
) -> list[KlineRecord]:
"""
查询指定时间范围内的 K 线数据。
字段映射 — data/db/entities/kline.entity.ts:
time → Kline.time (timestamptz)
exchange → Kline.exchange (text)
symbol → Kline.symbol (text)
interval → Kline.interval (text)
open → Kline.open (numeric)
high → Kline.high (numeric)
low → Kline.low (numeric)
close → Kline.close (numeric)
volume → Kline.volume (numeric)
quote_volume → Kline.quote_volume (nullable)
trade_count → Kline.trade_count (nullable)
is_closed → Kline.is_closed (boolean)
"""
pool = await self._ensure_pool()
query = """
SELECT
time,
exchange,
symbol,
interval,
open,
high,
low,
close,
volume,
quote_volume,
taker_buy_base_vol,
taker_buy_quote_vol,
trade_count,
is_closed
FROM klines
WHERE exchange = $1
AND symbol = $2
AND interval = $3
AND time >= $4
AND time <= $5
ORDER BY time ASC
LIMIT $6
"""
rows: Sequence[asyncpg.Record] = await pool.fetch(
query,
exchange,
symbol,
interval,
start_time,
end_time,
limit,
)
return [KlineRecord.from_record(row) for row in rows]
async def get_latest_klines(
self,
symbol: str,
interval: str,
exchange: str = "binance",
limit: int = 500,
) -> list[KlineRecord]:
"""
获取最近 N 根已闭合的 K 线(策略启动时快速预热)。
仅查询 is_closed = TRUE 的 K 线,避免使用未闭合的不完整数据。
"""
pool = await self._ensure_pool()
query = """
SELECT
time, exchange, symbol, interval,
open, high, low, close, volume,
quote_volume, taker_buy_base_vol, taker_buy_quote_vol,
trade_count, is_closed
FROM klines
WHERE exchange = $1
AND symbol = $2
AND interval = $3
AND is_closed = TRUE
ORDER BY time DESC
LIMIT $4
"""
rows = await pool.fetch(query, exchange, symbol, interval, limit)
# 正序返回(时间升序)
records = [KlineRecord.from_record(row) for row in rows]
records.reverse()
return records
async def get_ohlcv_array(
self,
symbol: str,
interval: str,
exchange: str = "binance",
limit: int = 200,
) -> list[tuple[datetime, float, float, float, float, float]]:
"""
获取 OHLCV 元组数组,直接用于 pandas DataFrame 或 TA-Lib 计算。
返回格式: [(timestamp, open, high, low, close, volume), ...]
时间升序排列。
"""
pool = await self._ensure_pool()
query = """
SELECT time, open, high, low, close, volume
FROM klines
WHERE exchange = $1
AND symbol = $2
AND interval = $3
ORDER BY time DESC
LIMIT $4
"""
rows = await pool.fetch(query, exchange, symbol, interval, limit)
return [
(row["time"], row["open"], row["high"], row["low"], row["close"], row["volume"])
for row in reversed(rows)
]
async def get_available_symbols(
self,
exchange: str = "binance",
) -> list[str]:
"""查询 TimescaleDB 中有 K 线数据的交易对列表。"""
pool = await self._ensure_pool()
rows = await pool.fetch(
"""
SELECT DISTINCT symbol
FROM klines
WHERE exchange = $1
ORDER BY symbol
""",
exchange,
)
return [row["symbol"] for row in rows]
4.3 查询模式与索引对齐
Kline 实体的复合主键 (exchange, symbol, interval, time) 决定了最常用的查询模式。以下查询均能命中主键索引:
| 查询场景 | SQL 条件 | 索引命中 |
|---|---|---|
| 回测:某交易对某周期的时间范围 | WHERE exchange=$1 AND symbol=$2 AND interval=$3 AND time>=$4 AND time<=$5 ORDER BY time |
✅ 主键索引前导列精确匹配 |
| 最新 K 线:已闭合的最近 N 根 | WHERE exchange=$1 AND symbol=$2 AND interval=$3 AND is_closed=TRUE ORDER BY time DESC LIMIT N |
✅ 主键索引 + 顺序扫描(LIMIT 小) |
| 可用交易对枚举 | SELECT DISTINCT symbol WHERE exchange=$1 |
⚠️ 全表扫描 — 建议从 trading_pairs 表获取 |
性能建议:
get_available_symbols()应优先查询trading_pairs表(关系数据,TypeORM 管理域),而非扫描klines时序表。
5. 实体映射 — Python ↔ TypeORM 类型对齐
5.1 映射策略
Python 侧的数据模型镜像 TypeScript 侧 TypeORM 实体,保证双端数据结构一致性。映射规则:
TypeScript (TypeORM Entity) → Python (Pydantic Model)
════════════════════════════ ════════════════════════
@PrimaryColumn() timestamptz → datetime
@Column("text") → str
@Column("numeric", 20, 8) → float (Python float = C double, 足够 20 位精度)
@Column("boolean") → bool
@Column("integer") → int
@Column({ nullable: true }) → Optional[...]
5.2 Python 数据模型
# engine/data/models.py
"""
Python 数据模型 —— 镜像 data/db/entities/ 中的 TypeORM 实体。
命名约定:
- 类名与 TypeORM 实体类名一致(Kline, Exchange, TradingPair)
- 字段名使用 Python snake_case,对应 TypeORM 的 camelCase / snake_case
- 所有模型使用 Pydantic,提供运行时校验 + IDE 智能提示
"""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
# ============================================================
# Kline 模型 — 镜像 data/db/entities/kline.entity.ts
# ============================================================
class KlineRecord(BaseModel):
"""
K 线数据记录。
映射关系:
TypeORM Kline.time → KlineRecord.time
TypeORM Kline.exchange → KlineRecord.exchange
TypeORM Kline.symbol → KlineRecord.symbol
TypeORM Kline.interval → KlineRecord.interval
TypeORM Kline.open → KlineRecord.open
TypeORM Kline.high → KlineRecord.high
TypeORM Kline.low → KlineRecord.low
TypeORM Kline.close → KlineRecord.close
TypeORM Kline.volume → KlineRecord.volume
TypeORM Kline.quote_volume → KlineRecord.quote_volume
TypeORM Kline.taker_buy_base_vol → KlineRecord.taker_buy_base_vol
TypeORM Kline.taker_buy_quote_vol→ KlineRecord.taker_buy_quote_vol
TypeORM Kline.trade_count → KlineRecord.trade_count
TypeORM Kline.is_closed → KlineRecord.is_closed
"""
time: datetime
exchange: str
symbol: str
interval: str
# OHLCV
open: float
high: float
low: float
close: float
volume: float
# 扩展字段(nullable → Optional)
quote_volume: Optional[float] = None
taker_buy_base_vol: Optional[float] = None
taker_buy_quote_vol: Optional[float] = None
trade_count: Optional[int] = None
# 状态
is_closed: bool = True
@classmethod
def from_record(cls, record) -> "KlineRecord":
"""
从 asyncpg.Record 构造 KlineRecord 实例。
asyncpg 返回的 Record 对象行为类似 dict,
数值列自动转为 Python float/int,timestamptz 自动转为 datetime。
"""
return cls(
time=record["time"],
exchange=record["exchange"],
symbol=record["symbol"],
interval=record["interval"],
open=float(record["open"]),
high=float(record["high"]),
low=float(record["low"]),
close=float(record["close"]),
volume=float(record["volume"]),
quote_volume=float(record["quote_volume"]) if record["quote_volume"] is not None else None,
taker_buy_base_vol=float(record["taker_buy_base_vol"]) if record["taker_buy_base_vol"] is not None else None,
taker_buy_quote_vol=float(record["taker_buy_quote_vol"]) if record["taker_buy_quote_vol"] is not None else None,
trade_count=int(record["trade_count"]) if record["trade_count"] is not None else None,
is_closed=bool(record["is_closed"]),
)
# ============================================================
# TradingPair 模型 — 镜像 data/db/entities/trading-pair.entity.ts
# ============================================================
class TradingPairInfo(BaseModel):
"""
交易对配置信息(轻量版,仅包含策略决策所需的字段)。
完整实体参考 data/db/entities/trading-pair.entity.ts
"""
symbol: str
exchange: str
base_asset: str
quote_asset: str
price_precision: int
quantity_precision: int
min_qty: Optional[float] = None
min_notional: Optional[float] = None
active: bool = True
# ============================================================
# 策略信号模型
# ============================================================
class Signal(BaseModel):
"""
交易信号。
策略 generate_signal() 方法的返回值,
由 BaseStrategy.do_action() 消费执行。
"""
symbol: str
signal_type: str = Field(..., pattern=r"^(BUY|SELL|HOLD)$")
price: Optional[float] = None # 限价单价格(None = 市价单)
quantity: Optional[float] = None # 下单数量(None = 使用风控模块计算的默认值)
reason: str = "" # 信号产生原因(便于日志审计)
timestamp: datetime = Field(default_factory=datetime.utcnow)
5.3 字段对应速查表
TypeORM Entity (data/db/entities/kline.entity.ts) |
Python Model (engine/data/models.py) |
DB 列类型 | Python 类型 |
|---|---|---|---|
time (PrimaryColumn, timestamptz) |
KlineRecord.time |
TIMESTAMPTZ |
datetime |
exchange (PrimaryColumn, text) |
KlineRecord.exchange |
TEXT |
str |
symbol (PrimaryColumn, text) |
KlineRecord.symbol |
TEXT |
str |
interval (PrimaryColumn, text) |
KlineRecord.interval |
TEXT |
str |
open (numeric 20,8) |
KlineRecord.open |
NUMERIC(20,8) |
float |
high (numeric 20,8) |
KlineRecord.high |
NUMERIC(20,8) |
float |
low (numeric 20,8) |
KlineRecord.low |
NUMERIC(20,8) |
float |
close (numeric 20,8) |
KlineRecord.close |
NUMERIC(20,8) |
float |
volume (numeric 20,8) |
KlineRecord.volume |
NUMERIC(20,8) |
float |
quote_volume (nullable) |
KlineRecord.quote_volume |
NUMERIC(20,8) |
Optional[float] |
taker_buy_base_vol (nullable) |
KlineRecord.taker_buy_base_vol |
NUMERIC(20,8) |
Optional[float] |
taker_buy_quote_vol (nullable) |
KlineRecord.taker_buy_quote_vol |
NUMERIC(20,8) |
Optional[float] |
trade_count (nullable) |
KlineRecord.trade_count |
INTEGER |
Optional[int] |
is_closed |
KlineRecord.is_closed |
BOOLEAN |
bool |
6. 策略基类设计
# engine/strategy/base.py
"""策略抽象基类 —— 所有交易策略的父类。"""
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional
from engine.data.models import KlineRecord, Signal
class BaseStrategy(ABC):
"""
策略基类。
子类必须实现:
- generate_signal(): 基于 K 线数据生成交易信号
- on_kline(): K 线到达时的回调(可在此更新内部状态)
生命周期:
1. __init__() — 初始化策略参数和内状态
2. on_kline() — 每根新 K 线到达时调用(更新内部指标状态)
3. generate_signal() — 被回测引擎/实盘调度器调用,生成买卖信号
"""
def __init__(self, symbol: str, interval: str, **kwargs):
self.symbol = symbol
self.interval = interval
self._klines: list[KlineRecord] = [] # 内部 K 线缓存
self._last_signal: Optional[Signal] = None
@abstractmethod
async def generate_signal(self) -> Optional[Signal]:
"""
基于当前缓存的 K 线数据生成交易信号。
Returns:
Signal(BUY/SELL) — 触发交易
None — 无信号,继续持仓/观望
"""
...
async def on_kline(self, kline: KlineRecord) -> None:
"""
新 K 线到达回调(默认实现:追加到缓存)。
子类可重写以更新技术指标缓存(如 MA、RSI 中间计算结果)。
"""
self._klines.append(kline)
async def warm_up(self, klines: list[KlineRecord]) -> None:
"""
策略预热:加载历史 K 线,初始化内部状态。
回测引擎在开始回测前调用;实盘调度器在策略启动时调用。
"""
self._klines = sorted(klines, key=lambda k: k.time)
@property
def is_ready(self) -> bool:
"""策略是否已就绪(缓存了足够的历史 K 线)"""
return len(self._klines) >= self.min_klines_required
@property
@abstractmethod
def min_klines_required(self) -> int:
"""策略正常运行所需的最少 K 线数量(如双均线策略需要 slow_period+1 根)"""
...
7. 回测引擎设计
# engine/backtest/engine.py
"""
事件驱动回测引擎。
核心流程:
1. 从 TimescaleDB 加载历史 K 线
2. 按时间顺序逐根喂给策略
3. 收集信号 → 模拟成交 → 计算收益
4. 输出绩效报告(夏普比率、最大回撤、胜率等)
"""
from dataclasses import dataclass, field
from datetime import datetime
from engine.data.reader import KlineReader
from engine.data.models import KlineRecord, Signal
from engine.strategy.base import BaseStrategy
@dataclass
class BacktestResult:
"""回测结果"""
symbol: str
interval: str
start_time: datetime
end_time: datetime
total_klines: int
total_signals: int
total_return: float # 总收益率
annual_return: float # 年化收益率
max_drawdown: float # 最大回撤
sharpe_ratio: float # 夏普比率
win_rate: float # 胜率
profit_factor: float # 盈亏比
equity_curve: list[float] = field(default_factory=list) # 权益曲线
class BacktestEngine:
"""回测引擎"""
def __init__(self, reader: KlineReader):
self.reader = reader
async def run(
self,
strategy: BaseStrategy,
symbol: str,
interval: str,
start_time: datetime,
end_time: datetime,
exchange: str = "binance",
) -> BacktestResult:
"""
执行回测。
Args:
strategy: 待回测的策略实例
symbol: 交易对
interval: K 线周期
start_time: 回测起始时间
end_time: 回测结束时间
exchange: 交易所
"""
# 1. 加载历史 K 线
klines = await self.reader.get_klines(
symbol=symbol,
interval=interval,
start_time=start_time,
end_time=end_time,
exchange=exchange,
)
if len(klines) == 0:
raise ValueError(f"回测区间内无 K 线数据: {symbol} {interval} [{start_time}, {end_time}]")
# 2. 策略预热
await strategy.warm_up(klines)
# 3. 逐根 K 线回放
signals: list[Signal] = []
for kline in klines:
await strategy.on_kline(kline)
if strategy.is_ready:
signal = await strategy.generate_signal()
if signal is not None:
signals.append(signal)
# 4. 计算绩效指标
return self._compute_metrics(
symbol=symbol,
interval=interval,
start_time=start_time,
end_time=end_time,
klines=klines,
signals=signals,
)
def _compute_metrics(
self,
symbol: str,
interval: str,
start_time: datetime,
end_time: datetime,
klines: list[KlineRecord],
signals: list[Signal],
) -> BacktestResult:
"""计算回测绩效指标(框架方法,实际计算委托 metrics.py)"""
from engine.backtest.metrics import compute_metrics
return compute_metrics(
symbol=symbol,
interval=interval,
start_time=start_time,
end_time=end_time,
klines=klines,
signals=signals,
)
8. 最小可运行示例
8.1 安装依赖
# engine/pyproject.toml
[tool.poetry.dependencies]
python = "^3.10"
asyncpg = "^0.29"
pydantic = "^2.5"
pyyaml = "^6.0"
loguru = "^0.7"
pandas = "^2.0"
numpy = "^1.26"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
cd engine
poetry install
8.2 运行示例
# engine/__main__.py
"""
最小可运行示例 —— 验证 TimescaleDB 连接、配置读取、K 线查询。
运行方式:
cd engine
poetry run python -m engine
"""
import asyncio
from datetime import datetime, timedelta, timezone
from engine.config.settings import print_config_summary, db as db_config
from engine.data.db import get_pool, close_pool
from engine.data.reader import KlineReader
async def main():
# 1. 打印配置概要(验证 env.yaml 读取)
print_config_summary()
# 2. 测试 TimescaleDB 连接
pool = await get_pool()
async with pool.acquire() as conn:
version = await conn.fetchval("SELECT version()")
print(f"[db] TimescaleDB 连接成功: {version}")
# 3. 查询最近 10 根 BTCUSDT 1h K 线
reader = KlineReader()
end = datetime.now(timezone.utc)
start = end - timedelta(hours=24)
klines = await reader.get_klines(
symbol="BTCUSDT",
interval="1h",
start_time=start,
end_time=end,
exchange="binance",
limit=10,
)
print(f"\n[data] 查询到 {len(klines)} 根 K 线:")
for k in klines[:5]:
print(f" {k.time.isoformat()} | O={k.open:.2f} H={k.high:.2f} "
f"L={k.low:.2f} C={k.close:.2f} V={k.volume:.4f}")
# 4. 清理
await close_pool()
if __name__ == "__main__":
asyncio.run(main())
8.3 完整启动流程
# engine/run.py
"""策略引擎完整启动流程示例"""
import asyncio
from datetime import datetime, timedelta, timezone
from engine.config.settings import print_config_summary
from engine.data.db import get_pool, close_pool
from engine.data.reader import KlineReader
from engine.strategy.ma_cross import MACrossStrategy # 双均线策略示例
from engine.backtest.engine import BacktestEngine
async def run_backtest():
print_config_summary()
reader = KlineReader()
engine = BacktestEngine(reader)
# 策略参数
strategy = MACrossStrategy(
symbol="BTCUSDT",
interval="1h",
fast_period=5, # 快线周期
slow_period=20, # 慢线周期
)
# 回测区间:最近 90 天
end = datetime.now(timezone.utc)
start = end - timedelta(days=90)
print(f"\n[backtest] 开始回测: {strategy.symbol} {strategy.interval} "
f"[{start.date()} ~ {end.date()}]")
result = await engine.run(
strategy=strategy,
symbol="BTCUSDT",
interval="1h",
start_time=start,
end_time=end,
)
# 输出绩效
print(f"\n{'='*50}")
print(f"回测绩效报告")
print(f"{'='*50}")
print(f" K 线总数: {result.total_klines}")
print(f" 交易信号数: {result.total_signals}")
print(f" 总收益率: {result.total_return:.2%}")
print(f" 年化收益率: {result.annual_return:.2%}")
print(f" 最大回撤: {result.max_drawdown:.2%}")
print(f" 夏普比率: {result.sharpe_ratio:.2f}")
print(f" 胜率: {result.win_rate:.2%}")
print(f" 盈亏比: {result.profit_factor:.2f}")
print(f"{'='*50}")
await close_pool()
if __name__ == "__main__":
asyncio.run(run_backtest())
9. 性能考量
9.1 查询优化
| 场景 | 优化策略 | 效果 |
|---|---|---|
| 大数据量回测(> 10 万根 K 线) | 使用 asyncpg.Cursor 流式读取,避免一次性加载到内存 |
内存峰值从 O(n) 降到 O(batch_size) |
| 多币种并发查询 | asyncio.gather() 并行查询多个 symbol |
N 个币种查询时间 ≈ max(单币种时间),而非 sum |
| 重复区间查询 | 策略层缓存已查询的 K 线,避免重复 DB 查询 | 减少 90% 的重复 I/O |
| 高周期 K 线 | 若 TimescaleDB 配置了连续聚合视图,直接查询物化视图而非原始 1m 表 | 查询速度提升 10-100× |
9.2 内存管理
# 流式读取大数据量 K 线(避免 OOM)
async def get_klines_streaming(
pool: asyncpg.Pool,
symbol: str,
interval: str,
start_time: datetime,
end_time: datetime,
batch_size: int = 5000,
):
"""流式读取 K 线,每次返回 batch_size 条,适合百万级回测。"""
async with pool.acquire() as conn:
async with conn.transaction():
cursor = await conn.cursor(
"""
SELECT * FROM klines
WHERE exchange=$1 AND symbol=$2 AND interval=$3
AND time>=$4 AND time<=$5
ORDER BY time ASC
"""
)
# 省略参数绑定细节...
async for row in cursor:
yield KlineRecord.from_record(row)
9.3 向量化计算建议
策略中的技术指标计算(MA、RSI、MACD 等)若涉及全量历史数据,应使用 pandas + numpy 向量化计算,而非 Python 循环:
import pandas as pd
import numpy as np
def compute_sma_vectorized(klines: list[KlineRecord], period: int) -> np.ndarray:
"""向量化计算简单移动平均 — 比纯 Python 循环快 50-100×"""
closes = np.array([k.close for k in klines], dtype=np.float64)
return pd.Series(closes).rolling(window=period).mean().to_numpy()
10. 风险提示
⚠️ 风险声明:数字货币交易具有高风险,本系统仅提供技术执行工具。策略盈亏由使用者自负,开发者不做任何收益承诺。
10.1 数据相关风险
| 风险 | 说明 | 缓解措施 |
|---|---|---|
| 数据缺口 | TimescaleDB 中 K 线可能因 data 模块采集中断而存在缺口 | 策略内检测 time 间隔,缺口处跳过或标记 |
| 延迟数据 | 网络延迟导致最新 K 线不完整 | is_closed=False 的 K 线不应参与信号计算 |
| 精度损失 | Python float(IEEE 754 double)约 15-17 位有效数字,对标 NUMERIC(20,8) 足够,但极值场景需使用 Decimal |
价格/成交量使用 float;累积资金计算使用 Decimal |
10.2 策略相关风险
| 风险 | 说明 | 缓解措施 |
|---|---|---|
| 过拟合 | 参数在回测区间表现优异但实盘失效 | 使用 Walk-Forward 分析,保留样本外验证集 |
| 未来信息泄露 | 回测中不当使用了当前时间点之后的数据 | generate_signal() 仅允许访问截止当前 K 线的历史数据 |
| 滑点与手续费 | 回测未考虑实际交易成本 | 回测引擎中配置滑点模型和手续费率 |
10.3 跨模块一致性
| 检查项 | 验证方法 |
|---|---|
| Python 模型字段与 TypeORM 实体对齐 | 对照 data/db/entities/kline.entity.ts 逐一核对 |
| env.yaml 配置项双端一致 | 对照 data/config/index.ts 的 EnvConfig 类型 |
| K 线查询 SQL 的列名与实体一致 | 运行 __main__.py 示例验证查询返回数据 |
附录 A:依赖清单
# engine/pyproject.toml
[tool.poetry]
name = "trade-engine"
version = "0.1.0"
description = "数字货币量化交易系统 - Python 策略引擎"
authors = ["Trade Team"]
[tool.poetry.dependencies]
python = "^3.10"
asyncpg = "^0.29" # 异步 PostgreSQL 驱动(TimescaleDB 兼容)
pydantic = "^2.5" # 运行时类型校验(与 TypeScript Zod 对称)
pyyaml = "^6.0" # 解析 env.yaml(与 TypeScript yaml 包对称)
loguru = "^0.7" # 结构化日志
pandas = "^2.0" # 技术指标向量化计算
numpy = "^1.26" # 数值计算基础
[tool.poetry.group.dev.dependencies]
pytest = "^8.0"
pytest-asyncio = "^0.23"
mypy = "^1.8"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
附录 B:TypeScript 实体速查
以下为 data 模块实体的关键字段摘要,供 Python 开发时快速对照。 完整定义见
data/db/entities/kline.entity.ts。
| 实体 | 表名 | 主键 | 备注 |
|---|---|---|---|
Kline |
klines |
(exchange, symbol, interval, time) |
TimescaleDB hypertable,复合主键 |
Exchange |
exchanges |
id (UUID) |
TypeORM 管理域,继承 CommonBaseEntity |
TradingPair |
trading_pairs |
id (UUID) |
TypeORM 管理域,外键关联 exchanges |