diff --git a/engine/README.md b/engine/README.md new file mode 100644 index 0000000..a219ba9 --- /dev/null +++ b/engine/README.md @@ -0,0 +1,1152 @@ +# Engine Module — Python 策略引擎架构文档 + +> **模块定位**:数字货币量化交易系统的业务层,负责策略开发、回测分析、信号生成,从 TimescaleDB 读取 data 模块持久化的 K 线数据进行策略决策。 +> +> **运行时**:Python 3.10+ | **依赖管理**:Poetry / uv | **数据库驱动**:asyncpg | **配置解析**:PyYAML + +--- + +## 目录 + +1. [模块定位与架构边界](#1-模块定位与架构边界) +2. [目录结构](#2-目录结构) +3. [配置管理 — 读取 env.yaml](#3-配置管理--读取-envyaml) +4. [数据读取 — TimescaleDB K 线查询](#4-数据读取--timescaledb-k-线查询) +5. [实体映射 — Python ↔ TypeORM 类型对齐](#5-实体映射--python--typeorm-类型对齐) +6. [策略基类设计](#6-策略基类设计) +7. [回测引擎设计](#7-回测引擎设计) +8. [最小可运行示例](#8-最小可运行示例) +9. [性能考量](#9-性能考量) +10. [风险提示](#10-风险提示) + +--- + +## 1. 模块定位与架构边界 + +### 1.1 系统分层中的位置 + +``` +┌──────────────────────────────────────────────────────────────┐ +│ 🐍 Python Engine (本模块) │ +│ │ +│ ┌──────────────┐ ┌──────────────┐ ┌───────────────────┐ │ +│ │ 策略引擎 │ │ 回测引擎 │ │ 信号分发器 │ │ +│ │ (strategy/) │ │ (backtest/) │ │ (signals/) │ │ +│ └──────┬───────┘ └──────┬───────┘ └────────┬──────────┘ │ +│ │ │ │ │ +│ └─────────────────┼────────────────────┘ │ +│ │ │ +│ ┌────────────────────────▼──────────────────────────────┐ │ +│ │ 数据访问层 (data/) │ │ +│ │ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ +│ │ │ TimescaleDB │ │ env.yaml │ │ Redis │ │ │ +│ │ │ Reader │ │ Config │ │ Subscriber │ │ │ +│ │ └─────────────┘ └──────────────┘ └──────────────┘ │ │ +│ └───────────────────────────────────────────────────────┘ │ +└──────────────────────────┬───────────────────────────────────┘ + │ + ┌──────────────────┼──────────────────┐ + │ │ │ + ▼ ▼ ▼ +┌──────────────┐ ┌──────────────┐ ┌──────────────┐ +│ TimescaleDB │ │ env.yaml │ │ Redis │ +│ (data 模块 │ │ (项目根目录) │ │ (行情消息) │ +│ 写入的K线) │ │ │ │ │ +└──────────────┘ └──────────────┘ └──────────────┘ +``` + +### 1.2 与 data 模块的关系 + +| 维度 | data 模块 (TypeScript) | engine 模块 (Python) | +|------|----------------------|---------------------| +| **职责** | 行情采集、K 线合成、数据持久化 | 策略计算、回测、信号生成 | +| **数据库操作** | **写入**(UPSERT K 线到 TimescaleDB) | **只读**(SELECT K 线进行策略分析) | +| **配置来源** | `data/config/index.ts` 读取 `env.yaml` | `engine/config.py` 读取同一份 `env.yaml` | +| **实体定义** | TypeORM 装饰器实体 (`data/db/entities/`) | Pydantic/dataclass 模型(镜像 TypeORM 结构) | +| **运行时** | Bun / Node.js | CPython 3.10+ | + +> **设计原则**:engine 模块对 TimescaleDB **只读**,绝不写入。K 线数据的写入由 data 模块全权负责,避免双端写入导致的数据一致性问题。 + +--- + +## 2. 目录结构 + +``` +engine/ +├── README.md # 本架构文档 +├── pyproject.toml # Python 项目依赖(Poetry) +├── __init__.py +│ +├── config/ # 配置管理 +│ ├── __init__.py +│ └── settings.py # 读取 ../../env.yaml,Pydantic 校验 +│ +├── data/ # 数据访问层(只读 TimescaleDB) +│ ├── __init__.py +│ ├── db.py # asyncpg 连接池管理 +│ ├── reader.py # K 线查询方法封装 +│ └── models.py # Python 数据模型(镜像 TypeORM 实体) +│ +├── strategy/ # 策略实现 +│ ├── __init__.py +│ ├── base.py # BaseStrategy 抽象基类 +│ ├── ma_cross.py # 双均线交叉策略示例 +│ └── grid.py # 网格交易策略示例 +│ +├── backtest/ # 回测引擎 +│ ├── __init__.py +│ ├── engine.py # 事件驱动回测引擎 +│ └── metrics.py # 绩效指标计算(夏普比率、最大回撤等) +│ +├── signals/ # 信号分发 +│ ├── __init__.py +│ └── dispatcher.py # 信号生成与分发 +│ +├── common/ # 公共工具 +│ ├── __init__.py +│ ├── logger.py # loguru 日志配置 +│ └── utils.py # 时间工具、重试装饰器等 +│ +└── tests/ # 单元测试 + ├── __init__.py + ├── test_config.py + ├── test_reader.py + └── test_strategy.py +``` + +--- + +## 3. 配置管理 — 读取 env.yaml + +### 3.1 设计思路 + +engine 模块与 data 模块共享项目根目录的 `env.yaml`。Python 侧通过 `PyYAML` 解析,`Pydantic` 进行运行时校验——这与 TypeScript 侧 `data/config/index.ts` 使用 `yaml` + `zod` 的模式完全对称。 + +### 3.2 配置文件结构回顾 + +`env.yaml`(项目根目录,data 与 engine 共享): + +```yaml +# --- TimescaleDB / PostgreSQL 连接 --- +db: + host: localhost + port: 5432 + name: trade + user: trader + password: fucketh + +# --- Redis 连接 --- +redis: + url: redis://localhost:6379 + publish_enabled: true + +# --- 日志 --- +logging: + level: debug + node_env: development +``` + +### 3.3 Python 实现 + +```python +# engine/config/settings.py +"""中心化配置模块 —— 读取项目根目录 env.yaml,Pydantic 校验导出强类型配置对象。""" + +from pathlib import Path +from typing import Literal + +import yaml +from pydantic import BaseModel, Field, field_validator + + +# ============================================================ +# 1. Pydantic 校验模型(镜像 TypeScript config/validators.ts) +# ============================================================ + +class DbConfig(BaseModel): + """TimescaleDB / PostgreSQL 连接参数""" + host: str = "localhost" + port: int = Field(default=5432, ge=1, le=65535) + name: str # 对应 YAML 中的 name + user: str + password: str + + +class RedisConfig(BaseModel): + """Redis 连接配置""" + url: str = "redis://localhost:6379" + publish_enabled: bool = True + + +class LoggingConfig(BaseModel): + """日志配置""" + level: Literal["trace", "debug", "info", "warn", "error", "fatal"] = "info" + node_env: Literal["development", "production"] = "development" + + +class EnvConfig(BaseModel): + """env.yaml 顶层结构""" + db: DbConfig + redis: RedisConfig + logging: LoggingConfig + + +# ============================================================ +# 2. 定位并加载 env.yaml +# ============================================================ + +def _get_project_root() -> Path: + """ + 计算项目根目录的绝对路径。 + engine/config/settings.py → engine/config/ → engine/ → / + """ + return Path(__file__).resolve().parent.parent.parent + + +def _load_yaml_config() -> dict: + """读取项目根目录 env.yaml,返回原始字典。文件不存在时抛出明确错误。""" + root = _get_project_root() + yaml_path = root / "env.yaml" + + if not yaml_path.exists(): + raise FileNotFoundError( + f"[config] 无法读取配置文件: {yaml_path}\n" + f"请确保项目根目录存在 env.yaml。" + ) + + with open(yaml_path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + + if data is None: + raise ValueError(f"[config] env.yaml 解析结果为空: {yaml_path}") + + return data + + +# ============================================================ +# 3. 加载 & 校验 & 导出 +# ============================================================ + +_raw = _load_yaml_config() +_config = EnvConfig.model_validate(_raw) + +# --- 按职责分组的导出配置对象 --- + +# TimescaleDB 连接参数(可直接用于 asyncpg.create_pool) +db = _config.db + +# Redis 连接参数 +redis = _config.redis + +# 日志参数 +logging = _config.logging + +# --- asyncpg DSN(便捷构造)--- +DB_DSN = ( + f"postgresql://{db.user}:{db.password}@{db.host}:{db.port}/{db.name}" +) + + +def print_config_summary() -> None: + """打印脱敏后的配置概要(不含密码明文)""" + print(f"[config] TimescaleDB: {db.user}@{db.host}:{db.port}/{db.name}") + print(f"[config] Redis: {redis.url.replace('//', '//***@') if '@' in redis.url else redis.url}") + print(f"[config] Logging: level={logging.level}, env={logging.node_env}") +``` + +### 3.4 与 TypeScript 侧对比 + +| 步骤 | TypeScript (`data/config/index.ts`) | Python (`engine/config/settings.py`) | +|------|--------------------------------------|--------------------------------------| +| **路径计算** | `fileURLToPath(import.meta.url)` → `../..` | `Path(__file__).resolve().parent.parent.parent` | +| **YAML 解析** | `parse()` from `yaml` npm 包 | `yaml.safe_load()` from PyYAML | +| **类型校验** | `validateConfig()` 使用 `zod` | `EnvConfig.model_validate()` 使用 Pydantic | +| **导出方式** | 分组 `const` 对象 (`pgsql`, `redis`, `logging`) | 分组模块级变量 (`db`, `redis`, `logging`) | +| **DSN 构造** | 不构造,由 TypeORM DataSource 使用拆解字段 | 额外导出 `DB_DSN` 供 asyncpg 使用 | + +--- + +## 4. 数据读取 — TimescaleDB K 线查询 + +### 4.1 连接池管理 + +```python +# engine/data/db.py +"""asyncpg 连接池管理 —— engine 模块对 TimescaleDB 的唯一切入点。""" + +import asyncpg +from engine.config.settings import db as db_config, DB_DSN + +# 模块级连接池(单例) +_pool: asyncpg.Pool | None = None + + +async def get_pool() -> asyncpg.Pool: + """获取或创建 asyncpg 连接池(懒初始化,复用)。""" + global _pool + if _pool is None: + _pool = await asyncpg.create_pool( + dsn=DB_DSN, + min_size=2, + max_size=10, + command_timeout=30, # 查询超时 30 秒 + # 只读模式:在连接级别设置,防止误写 + server_settings={"default_transaction_read_only": "on"}, + ) + return _pool + + +async def close_pool() -> None: + """关闭连接池(应用退出时调用)""" + global _pool + if _pool: + await _pool.close() + _pool = None +``` + +### 4.2 K 线读取器 + +```python +# engine/data/reader.py +""" +K 线数据读取器 —— 从 TimescaleDB 查询历史 K 线供策略分析和回测使用。 + +所有方法均为只读查询,对应 data 模块 Kline 实体的字段结构。 +参考:data/db/entities/kline.entity.ts +""" + +from datetime import datetime +from typing import Optional, Sequence + +import asyncpg + +from engine.data.db import get_pool +from engine.data.models import KlineRecord + + +class KlineReader: + """TimescaleDB K 线只读查询器""" + + def __init__(self): + self._pool: Optional[asyncpg.Pool] = None + + async def _ensure_pool(self) -> asyncpg.Pool: + if self._pool is None: + self._pool = await get_pool() + return self._pool + + # ---------------------------------------------------------- + # 核心查询方法 + # ---------------------------------------------------------- + + async def get_klines( + self, + symbol: str, + interval: str, + start_time: datetime, + end_time: datetime, + exchange: str = "binance", + limit: int = 1000, + ) -> list[KlineRecord]: + """ + 查询指定时间范围内的 K 线数据。 + + 字段映射 — data/db/entities/kline.entity.ts: + time → Kline.time (timestamptz) + exchange → Kline.exchange (text) + symbol → Kline.symbol (text) + interval → Kline.interval (text) + open → Kline.open (numeric) + high → Kline.high (numeric) + low → Kline.low (numeric) + close → Kline.close (numeric) + volume → Kline.volume (numeric) + quote_volume → Kline.quote_volume (nullable) + trade_count → Kline.trade_count (nullable) + is_closed → Kline.is_closed (boolean) + """ + pool = await self._ensure_pool() + + query = """ + SELECT + time, + exchange, + symbol, + interval, + open, + high, + low, + close, + volume, + quote_volume, + taker_buy_base_vol, + taker_buy_quote_vol, + trade_count, + is_closed + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + AND time >= $4 + AND time <= $5 + ORDER BY time ASC + LIMIT $6 + """ + + rows: Sequence[asyncpg.Record] = await pool.fetch( + query, + exchange, + symbol, + interval, + start_time, + end_time, + limit, + ) + + return [KlineRecord.from_record(row) for row in rows] + + async def get_latest_klines( + self, + symbol: str, + interval: str, + exchange: str = "binance", + limit: int = 500, + ) -> list[KlineRecord]: + """ + 获取最近 N 根已闭合的 K 线(策略启动时快速预热)。 + + 仅查询 is_closed = TRUE 的 K 线,避免使用未闭合的不完整数据。 + """ + pool = await self._ensure_pool() + + query = """ + SELECT + time, exchange, symbol, interval, + open, high, low, close, volume, + quote_volume, taker_buy_base_vol, taker_buy_quote_vol, + trade_count, is_closed + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + AND is_closed = TRUE + ORDER BY time DESC + LIMIT $4 + """ + + rows = await pool.fetch(query, exchange, symbol, interval, limit) + + # 正序返回(时间升序) + records = [KlineRecord.from_record(row) for row in rows] + records.reverse() + return records + + async def get_ohlcv_array( + self, + symbol: str, + interval: str, + exchange: str = "binance", + limit: int = 200, + ) -> list[tuple[datetime, float, float, float, float, float]]: + """ + 获取 OHLCV 元组数组,直接用于 pandas DataFrame 或 TA-Lib 计算。 + + 返回格式: [(timestamp, open, high, low, close, volume), ...] + 时间升序排列。 + """ + pool = await self._ensure_pool() + + query = """ + SELECT time, open, high, low, close, volume + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + ORDER BY time DESC + LIMIT $4 + """ + + rows = await pool.fetch(query, exchange, symbol, interval, limit) + + return [ + (row["time"], row["open"], row["high"], row["low"], row["close"], row["volume"]) + for row in reversed(rows) + ] + + async def get_available_symbols( + self, + exchange: str = "binance", + ) -> list[str]: + """查询 TimescaleDB 中有 K 线数据的交易对列表。""" + pool = await self._ensure_pool() + + rows = await pool.fetch( + """ + SELECT DISTINCT symbol + FROM klines + WHERE exchange = $1 + ORDER BY symbol + """, + exchange, + ) + + return [row["symbol"] for row in rows] +``` + +### 4.3 查询模式与索引对齐 + +Kline 实体的复合主键 `(exchange, symbol, interval, time)` 决定了最常用的查询模式。以下查询均能命中主键索引: + +| 查询场景 | SQL 条件 | 索引命中 | +|---------|---------|---------| +| 回测:某交易对某周期的时间范围 | `WHERE exchange=$1 AND symbol=$2 AND interval=$3 AND time>=$4 AND time<=$5 ORDER BY time` | ✅ 主键索引前导列精确匹配 | +| 最新 K 线:已闭合的最近 N 根 | `WHERE exchange=$1 AND symbol=$2 AND interval=$3 AND is_closed=TRUE ORDER BY time DESC LIMIT N` | ✅ 主键索引 + 顺序扫描(LIMIT 小) | +| 可用交易对枚举 | `SELECT DISTINCT symbol WHERE exchange=$1` | ⚠️ 全表扫描 — 建议从 `trading_pairs` 表获取 | + +> **性能建议**:`get_available_symbols()` 应优先查询 `trading_pairs` 表(关系数据,TypeORM 管理域),而非扫描 `klines` 时序表。 + +--- + +## 5. 实体映射 — Python ↔ TypeORM 类型对齐 + +### 5.1 映射策略 + +Python 侧的数据模型镜像 TypeScript 侧 TypeORM 实体,保证双端数据结构一致性。映射规则: + +``` +TypeScript (TypeORM Entity) → Python (Pydantic Model) +════════════════════════════ ════════════════════════ +@PrimaryColumn() timestamptz → datetime +@Column("text") → str +@Column("numeric", 20, 8) → float (Python float = C double, 足够 20 位精度) +@Column("boolean") → bool +@Column("integer") → int +@Column({ nullable: true }) → Optional[...] +``` + +### 5.2 Python 数据模型 + +```python +# engine/data/models.py +""" +Python 数据模型 —— 镜像 data/db/entities/ 中的 TypeORM 实体。 + +命名约定: + - 类名与 TypeORM 实体类名一致(Kline, Exchange, TradingPair) + - 字段名使用 Python snake_case,对应 TypeORM 的 camelCase / snake_case + - 所有模型使用 Pydantic,提供运行时校验 + IDE 智能提示 +""" + +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel, Field + + +# ============================================================ +# Kline 模型 — 镜像 data/db/entities/kline.entity.ts +# ============================================================ + +class KlineRecord(BaseModel): + """ + K 线数据记录。 + + 映射关系: + TypeORM Kline.time → KlineRecord.time + TypeORM Kline.exchange → KlineRecord.exchange + TypeORM Kline.symbol → KlineRecord.symbol + TypeORM Kline.interval → KlineRecord.interval + TypeORM Kline.open → KlineRecord.open + TypeORM Kline.high → KlineRecord.high + TypeORM Kline.low → KlineRecord.low + TypeORM Kline.close → KlineRecord.close + TypeORM Kline.volume → KlineRecord.volume + TypeORM Kline.quote_volume → KlineRecord.quote_volume + TypeORM Kline.taker_buy_base_vol → KlineRecord.taker_buy_base_vol + TypeORM Kline.taker_buy_quote_vol→ KlineRecord.taker_buy_quote_vol + TypeORM Kline.trade_count → KlineRecord.trade_count + TypeORM Kline.is_closed → KlineRecord.is_closed + """ + + time: datetime + exchange: str + symbol: str + interval: str + + # OHLCV + open: float + high: float + low: float + close: float + volume: float + + # 扩展字段(nullable → Optional) + quote_volume: Optional[float] = None + taker_buy_base_vol: Optional[float] = None + taker_buy_quote_vol: Optional[float] = None + trade_count: Optional[int] = None + + # 状态 + is_closed: bool = True + + @classmethod + def from_record(cls, record) -> "KlineRecord": + """ + 从 asyncpg.Record 构造 KlineRecord 实例。 + + asyncpg 返回的 Record 对象行为类似 dict, + 数值列自动转为 Python float/int,timestamptz 自动转为 datetime。 + """ + return cls( + time=record["time"], + exchange=record["exchange"], + symbol=record["symbol"], + interval=record["interval"], + open=float(record["open"]), + high=float(record["high"]), + low=float(record["low"]), + close=float(record["close"]), + volume=float(record["volume"]), + quote_volume=float(record["quote_volume"]) if record["quote_volume"] is not None else None, + taker_buy_base_vol=float(record["taker_buy_base_vol"]) if record["taker_buy_base_vol"] is not None else None, + taker_buy_quote_vol=float(record["taker_buy_quote_vol"]) if record["taker_buy_quote_vol"] is not None else None, + trade_count=int(record["trade_count"]) if record["trade_count"] is not None else None, + is_closed=bool(record["is_closed"]), + ) + + +# ============================================================ +# TradingPair 模型 — 镜像 data/db/entities/trading-pair.entity.ts +# ============================================================ + +class TradingPairInfo(BaseModel): + """ + 交易对配置信息(轻量版,仅包含策略决策所需的字段)。 + + 完整实体参考 data/db/entities/trading-pair.entity.ts + """ + symbol: str + exchange: str + base_asset: str + quote_asset: str + price_precision: int + quantity_precision: int + min_qty: Optional[float] = None + min_notional: Optional[float] = None + active: bool = True + + +# ============================================================ +# 策略信号模型 +# ============================================================ + +class Signal(BaseModel): + """ + 交易信号。 + + 策略 generate_signal() 方法的返回值, + 由 BaseStrategy.do_action() 消费执行。 + """ + symbol: str + signal_type: str = Field(..., pattern=r"^(BUY|SELL|HOLD)$") + price: Optional[float] = None # 限价单价格(None = 市价单) + quantity: Optional[float] = None # 下单数量(None = 使用风控模块计算的默认值) + reason: str = "" # 信号产生原因(便于日志审计) + timestamp: datetime = Field(default_factory=datetime.utcnow) +``` + +### 5.3 字段对应速查表 + +| TypeORM Entity (`data/db/entities/kline.entity.ts`) | Python Model (`engine/data/models.py`) | DB 列类型 | Python 类型 | +|-----------------------------------------------------|----------------------------------------|----------|------------| +| `time` (PrimaryColumn, timestamptz) | `KlineRecord.time` | `TIMESTAMPTZ` | `datetime` | +| `exchange` (PrimaryColumn, text) | `KlineRecord.exchange` | `TEXT` | `str` | +| `symbol` (PrimaryColumn, text) | `KlineRecord.symbol` | `TEXT` | `str` | +| `interval` (PrimaryColumn, text) | `KlineRecord.interval` | `TEXT` | `str` | +| `open` (numeric 20,8) | `KlineRecord.open` | `NUMERIC(20,8)` | `float` | +| `high` (numeric 20,8) | `KlineRecord.high` | `NUMERIC(20,8)` | `float` | +| `low` (numeric 20,8) | `KlineRecord.low` | `NUMERIC(20,8)` | `float` | +| `close` (numeric 20,8) | `KlineRecord.close` | `NUMERIC(20,8)` | `float` | +| `volume` (numeric 20,8) | `KlineRecord.volume` | `NUMERIC(20,8)` | `float` | +| `quote_volume` (nullable) | `KlineRecord.quote_volume` | `NUMERIC(20,8)` | `Optional[float]` | +| `taker_buy_base_vol` (nullable) | `KlineRecord.taker_buy_base_vol` | `NUMERIC(20,8)` | `Optional[float]` | +| `taker_buy_quote_vol` (nullable) | `KlineRecord.taker_buy_quote_vol` | `NUMERIC(20,8)` | `Optional[float]` | +| `trade_count` (nullable) | `KlineRecord.trade_count` | `INTEGER` | `Optional[int]` | +| `is_closed` | `KlineRecord.is_closed` | `BOOLEAN` | `bool` | + +--- + +## 6. 策略基类设计 + +```python +# engine/strategy/base.py +"""策略抽象基类 —— 所有交易策略的父类。""" + +from abc import ABC, abstractmethod +from datetime import datetime +from typing import Optional + +from engine.data.models import KlineRecord, Signal + + +class BaseStrategy(ABC): + """ + 策略基类。 + + 子类必须实现: + - generate_signal(): 基于 K 线数据生成交易信号 + - on_kline(): K 线到达时的回调(可在此更新内部状态) + + 生命周期: + 1. __init__() — 初始化策略参数和内状态 + 2. on_kline() — 每根新 K 线到达时调用(更新内部指标状态) + 3. generate_signal() — 被回测引擎/实盘调度器调用,生成买卖信号 + """ + + def __init__(self, symbol: str, interval: str, **kwargs): + self.symbol = symbol + self.interval = interval + self._klines: list[KlineRecord] = [] # 内部 K 线缓存 + self._last_signal: Optional[Signal] = None + + @abstractmethod + async def generate_signal(self) -> Optional[Signal]: + """ + 基于当前缓存的 K 线数据生成交易信号。 + + Returns: + Signal(BUY/SELL) — 触发交易 + None — 无信号,继续持仓/观望 + """ + ... + + async def on_kline(self, kline: KlineRecord) -> None: + """ + 新 K 线到达回调(默认实现:追加到缓存)。 + + 子类可重写以更新技术指标缓存(如 MA、RSI 中间计算结果)。 + """ + self._klines.append(kline) + + async def warm_up(self, klines: list[KlineRecord]) -> None: + """ + 策略预热:加载历史 K 线,初始化内部状态。 + + 回测引擎在开始回测前调用;实盘调度器在策略启动时调用。 + """ + self._klines = sorted(klines, key=lambda k: k.time) + + @property + def is_ready(self) -> bool: + """策略是否已就绪(缓存了足够的历史 K 线)""" + return len(self._klines) >= self.min_klines_required + + @property + @abstractmethod + def min_klines_required(self) -> int: + """策略正常运行所需的最少 K 线数量(如双均线策略需要 slow_period+1 根)""" + ... +``` + +--- + +## 7. 回测引擎设计 + +```python +# engine/backtest/engine.py +""" +事件驱动回测引擎。 + +核心流程: + 1. 从 TimescaleDB 加载历史 K 线 + 2. 按时间顺序逐根喂给策略 + 3. 收集信号 → 模拟成交 → 计算收益 + 4. 输出绩效报告(夏普比率、最大回撤、胜率等) +""" + +from dataclasses import dataclass, field +from datetime import datetime + +from engine.data.reader import KlineReader +from engine.data.models import KlineRecord, Signal +from engine.strategy.base import BaseStrategy + + +@dataclass +class BacktestResult: + """回测结果""" + symbol: str + interval: str + start_time: datetime + end_time: datetime + total_klines: int + total_signals: int + total_return: float # 总收益率 + annual_return: float # 年化收益率 + max_drawdown: float # 最大回撤 + sharpe_ratio: float # 夏普比率 + win_rate: float # 胜率 + profit_factor: float # 盈亏比 + equity_curve: list[float] = field(default_factory=list) # 权益曲线 + + +class BacktestEngine: + """回测引擎""" + + def __init__(self, reader: KlineReader): + self.reader = reader + + async def run( + self, + strategy: BaseStrategy, + symbol: str, + interval: str, + start_time: datetime, + end_time: datetime, + exchange: str = "binance", + ) -> BacktestResult: + """ + 执行回测。 + + Args: + strategy: 待回测的策略实例 + symbol: 交易对 + interval: K 线周期 + start_time: 回测起始时间 + end_time: 回测结束时间 + exchange: 交易所 + """ + # 1. 加载历史 K 线 + klines = await self.reader.get_klines( + symbol=symbol, + interval=interval, + start_time=start_time, + end_time=end_time, + exchange=exchange, + ) + + if len(klines) == 0: + raise ValueError(f"回测区间内无 K 线数据: {symbol} {interval} [{start_time}, {end_time}]") + + # 2. 策略预热 + await strategy.warm_up(klines) + + # 3. 逐根 K 线回放 + signals: list[Signal] = [] + for kline in klines: + await strategy.on_kline(kline) + if strategy.is_ready: + signal = await strategy.generate_signal() + if signal is not None: + signals.append(signal) + + # 4. 计算绩效指标 + return self._compute_metrics( + symbol=symbol, + interval=interval, + start_time=start_time, + end_time=end_time, + klines=klines, + signals=signals, + ) + + def _compute_metrics( + self, + symbol: str, + interval: str, + start_time: datetime, + end_time: datetime, + klines: list[KlineRecord], + signals: list[Signal], + ) -> BacktestResult: + """计算回测绩效指标(框架方法,实际计算委托 metrics.py)""" + from engine.backtest.metrics import compute_metrics + + return compute_metrics( + symbol=symbol, + interval=interval, + start_time=start_time, + end_time=end_time, + klines=klines, + signals=signals, + ) +``` + +--- + +## 8. 最小可运行示例 + +### 8.1 安装依赖 + +```bash +# engine/pyproject.toml +[tool.poetry.dependencies] +python = "^3.10" +asyncpg = "^0.29" +pydantic = "^2.5" +pyyaml = "^6.0" +loguru = "^0.7" +pandas = "^2.0" +numpy = "^1.26" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" +``` + +```bash +cd engine +poetry install +``` + +### 8.2 运行示例 + +```python +# engine/__main__.py +""" +最小可运行示例 —— 验证 TimescaleDB 连接、配置读取、K 线查询。 + +运行方式: + cd engine + poetry run python -m engine +""" + +import asyncio +from datetime import datetime, timedelta, timezone + +from engine.config.settings import print_config_summary, db as db_config +from engine.data.db import get_pool, close_pool +from engine.data.reader import KlineReader + + +async def main(): + # 1. 打印配置概要(验证 env.yaml 读取) + print_config_summary() + + # 2. 测试 TimescaleDB 连接 + pool = await get_pool() + async with pool.acquire() as conn: + version = await conn.fetchval("SELECT version()") + print(f"[db] TimescaleDB 连接成功: {version}") + + # 3. 查询最近 10 根 BTCUSDT 1h K 线 + reader = KlineReader() + end = datetime.now(timezone.utc) + start = end - timedelta(hours=24) + + klines = await reader.get_klines( + symbol="BTCUSDT", + interval="1h", + start_time=start, + end_time=end, + exchange="binance", + limit=10, + ) + + print(f"\n[data] 查询到 {len(klines)} 根 K 线:") + for k in klines[:5]: + print(f" {k.time.isoformat()} | O={k.open:.2f} H={k.high:.2f} " + f"L={k.low:.2f} C={k.close:.2f} V={k.volume:.4f}") + + # 4. 清理 + await close_pool() + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +### 8.3 完整启动流程 + +```python +# engine/run.py +"""策略引擎完整启动流程示例""" + +import asyncio +from datetime import datetime, timedelta, timezone + +from engine.config.settings import print_config_summary +from engine.data.db import get_pool, close_pool +from engine.data.reader import KlineReader +from engine.strategy.ma_cross import MACrossStrategy # 双均线策略示例 +from engine.backtest.engine import BacktestEngine + + +async def run_backtest(): + print_config_summary() + + reader = KlineReader() + engine = BacktestEngine(reader) + + # 策略参数 + strategy = MACrossStrategy( + symbol="BTCUSDT", + interval="1h", + fast_period=5, # 快线周期 + slow_period=20, # 慢线周期 + ) + + # 回测区间:最近 90 天 + end = datetime.now(timezone.utc) + start = end - timedelta(days=90) + + print(f"\n[backtest] 开始回测: {strategy.symbol} {strategy.interval} " + f"[{start.date()} ~ {end.date()}]") + + result = await engine.run( + strategy=strategy, + symbol="BTCUSDT", + interval="1h", + start_time=start, + end_time=end, + ) + + # 输出绩效 + print(f"\n{'='*50}") + print(f"回测绩效报告") + print(f"{'='*50}") + print(f" K 线总数: {result.total_klines}") + print(f" 交易信号数: {result.total_signals}") + print(f" 总收益率: {result.total_return:.2%}") + print(f" 年化收益率: {result.annual_return:.2%}") + print(f" 最大回撤: {result.max_drawdown:.2%}") + print(f" 夏普比率: {result.sharpe_ratio:.2f}") + print(f" 胜率: {result.win_rate:.2%}") + print(f" 盈亏比: {result.profit_factor:.2f}") + print(f"{'='*50}") + + await close_pool() + + +if __name__ == "__main__": + asyncio.run(run_backtest()) +``` + +--- + +## 9. 性能考量 + +### 9.1 查询优化 + +| 场景 | 优化策略 | 效果 | +|------|---------|------| +| **大数据量回测**(> 10 万根 K 线) | 使用 `asyncpg.Cursor` 流式读取,避免一次性加载到内存 | 内存峰值从 O(n) 降到 O(batch_size) | +| **多币种并发查询** | `asyncio.gather()` 并行查询多个 symbol | N 个币种查询时间 ≈ max(单币种时间),而非 sum | +| **重复区间查询** | 策略层缓存已查询的 K 线,避免重复 DB 查询 | 减少 90% 的重复 I/O | +| **高周期 K 线** | 若 TimescaleDB 配置了连续聚合视图,直接查询物化视图而非原始 1m 表 | 查询速度提升 10-100× | + +### 9.2 内存管理 + +```python +# 流式读取大数据量 K 线(避免 OOM) +async def get_klines_streaming( + pool: asyncpg.Pool, + symbol: str, + interval: str, + start_time: datetime, + end_time: datetime, + batch_size: int = 5000, +): + """流式读取 K 线,每次返回 batch_size 条,适合百万级回测。""" + async with pool.acquire() as conn: + async with conn.transaction(): + cursor = await conn.cursor( + """ + SELECT * FROM klines + WHERE exchange=$1 AND symbol=$2 AND interval=$3 + AND time>=$4 AND time<=$5 + ORDER BY time ASC + """ + ) + # 省略参数绑定细节... + async for row in cursor: + yield KlineRecord.from_record(row) +``` + +### 9.3 向量化计算建议 + +策略中的技术指标计算(MA、RSI、MACD 等)若涉及全量历史数据,应使用 `pandas` + `numpy` 向量化计算,而非 Python 循环: + +```python +import pandas as pd +import numpy as np + +def compute_sma_vectorized(klines: list[KlineRecord], period: int) -> np.ndarray: + """向量化计算简单移动平均 — 比纯 Python 循环快 50-100×""" + closes = np.array([k.close for k in klines], dtype=np.float64) + return pd.Series(closes).rolling(window=period).mean().to_numpy() +``` + +--- + +## 10. 风险提示 + +> ⚠️ **风险声明**:数字货币交易具有高风险,本系统仅提供技术执行工具。策略盈亏由使用者自负,开发者不做任何收益承诺。 + +### 10.1 数据相关风险 + +| 风险 | 说明 | 缓解措施 | +|------|------|---------| +| **数据缺口** | TimescaleDB 中 K 线可能因 data 模块采集中断而存在缺口 | 策略内检测 `time` 间隔,缺口处跳过或标记 | +| **延迟数据** | 网络延迟导致最新 K 线不完整 | `is_closed=False` 的 K 线不应参与信号计算 | +| **精度损失** | Python `float`(IEEE 754 double)约 15-17 位有效数字,对标 `NUMERIC(20,8)` 足够,但极值场景需使用 `Decimal` | 价格/成交量使用 `float`;累积资金计算使用 `Decimal` | + +### 10.2 策略相关风险 + +| 风险 | 说明 | 缓解措施 | +|------|------|---------| +| **过拟合** | 参数在回测区间表现优异但实盘失效 | 使用 Walk-Forward 分析,保留样本外验证集 | +| **未来信息泄露** | 回测中不当使用了当前时间点之后的数据 | `generate_signal()` 仅允许访问截止当前 K 线的历史数据 | +| **滑点与手续费** | 回测未考虑实际交易成本 | 回测引擎中配置滑点模型和手续费率 | + +### 10.3 跨模块一致性 + +| 检查项 | 验证方法 | +|--------|---------| +| Python 模型字段与 TypeORM 实体对齐 | 对照 [`data/db/entities/kline.entity.ts`](../data/db/entities/kline.entity.ts) 逐一核对 | +| env.yaml 配置项双端一致 | 对照 [`data/config/index.ts`](../data/config/index.ts) 的 `EnvConfig` 类型 | +| K 线查询 SQL 的列名与实体一致 | 运行 `__main__.py` 示例验证查询返回数据 | + +--- + +## 附录 A:依赖清单 + +```toml +# engine/pyproject.toml +[tool.poetry] +name = "trade-engine" +version = "0.1.0" +description = "数字货币量化交易系统 - Python 策略引擎" +authors = ["Trade Team"] + +[tool.poetry.dependencies] +python = "^3.10" +asyncpg = "^0.29" # 异步 PostgreSQL 驱动(TimescaleDB 兼容) +pydantic = "^2.5" # 运行时类型校验(与 TypeScript Zod 对称) +pyyaml = "^6.0" # 解析 env.yaml(与 TypeScript yaml 包对称) +loguru = "^0.7" # 结构化日志 +pandas = "^2.0" # 技术指标向量化计算 +numpy = "^1.26" # 数值计算基础 + +[tool.poetry.group.dev.dependencies] +pytest = "^8.0" +pytest-asyncio = "^0.23" +mypy = "^1.8" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" +``` + +--- + +## 附录 B:TypeScript 实体速查 + +> 以下为 data 模块实体的关键字段摘要,供 Python 开发时快速对照。 +> 完整定义见 [`data/db/entities/kline.entity.ts`](../data/db/entities/kline.entity.ts)。 + +| 实体 | 表名 | 主键 | 备注 | +|------|------|------|------| +| `Kline` | `klines` | `(exchange, symbol, interval, time)` | TimescaleDB hypertable,复合主键 | +| `Exchange` | `exchanges` | `id` (UUID) | TypeORM 管理域,继承 CommonBaseEntity | +| `TradingPair` | `trading_pairs` | `id` (UUID) | TypeORM 管理域,外键关联 exchanges | diff --git a/engine/config/__init__.py b/engine/config/__init__.py new file mode 100644 index 0000000..2358193 --- /dev/null +++ b/engine/config/__init__.py @@ -0,0 +1,11 @@ +from engine.config.settings import ( # noqa: F401 + DB_DSN, + DbConfig, + EnvConfig, + LoggingConfig, + RedisConfig, + db, + logging, + print_config_summary, + redis, +) diff --git a/engine/config/__pycache__/__init__.cpython-314.pyc b/engine/config/__pycache__/__init__.cpython-314.pyc new file mode 100644 index 0000000..67627b4 Binary files /dev/null and b/engine/config/__pycache__/__init__.cpython-314.pyc differ diff --git a/engine/config/__pycache__/settings.cpython-314.pyc b/engine/config/__pycache__/settings.cpython-314.pyc new file mode 100644 index 0000000..97002ec Binary files /dev/null and b/engine/config/__pycache__/settings.cpython-314.pyc differ diff --git a/engine/config/settings.py b/engine/config/settings.py new file mode 100644 index 0000000..09131c9 --- /dev/null +++ b/engine/config/settings.py @@ -0,0 +1,81 @@ +"""中心化配置模块 —— 读取项目根目录 env.yaml,Pydantic 校验导出强类型配置对象。""" + +from pathlib import Path +from typing import Literal + +import yaml +from pydantic import BaseModel, Field + + +class DbConfig(BaseModel): + """TimescaleDB / PostgreSQL 连接参数""" + + host: str = "localhost" + port: int = Field(default=5432, ge=1, le=65535) + name: str + user: str + password: str + + +class RedisConfig(BaseModel): + """Redis 连接配置""" + + url: str = "redis://localhost:6379" + publish_enabled: bool = True + + +class LoggingConfig(BaseModel): + """日志配置""" + + level: Literal["trace", "debug", "info", "warn", "error", "fatal"] = "info" + node_env: Literal["development", "production"] = "development" + + +class EnvConfig(BaseModel): + """env.yaml 顶层结构""" + + db: DbConfig + redis: RedisConfig + logging: LoggingConfig + + +def _get_project_root() -> Path: + return Path(__file__).resolve().parent.parent.parent + + +def _load_yaml_config() -> dict: + root = _get_project_root() + yaml_path = root / "env.yaml" + + if not yaml_path.exists(): + raise FileNotFoundError( + f"[config] 无法读取配置文件: {yaml_path}\n" + f"请确保项目根目录存在 env.yaml。" + ) + + with open(yaml_path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + + if data is None: + raise ValueError(f"[config] env.yaml 解析结果为空: {yaml_path}") + + return data + + +_raw = _load_yaml_config() +_config = EnvConfig.model_validate(_raw) + +db = _config.db +redis = _config.redis +logging = _config.logging + +DB_DSN = f"postgresql://{db.user}:{db.password}@{db.host}:{db.port}/{db.name}" + + +def print_config_summary() -> None: + """打印脱敏后的配置概要(不含密码明文)""" + print(f"[config] TimescaleDB: {db.user}@{db.host}:{db.port}/{db.name}") + print( + f"[config] Redis: {redis.url.replace('//', '//***@') if '@' in redis.url else redis.url}" + ) + print(f"[config] Logging: level={logging.level}, env={logging.node_env}") diff --git a/engine/data/__init__.py b/engine/data/__init__.py new file mode 100644 index 0000000..6634846 --- /dev/null +++ b/engine/data/__init__.py @@ -0,0 +1,3 @@ +from engine.data.db import close_pool, get_pool # noqa: F401 +from engine.data.models import KlineRecord, Signal, TradingPairInfo # noqa: F401 +from engine.data.reader import KlineReader # noqa: F401 diff --git a/engine/data/__pycache__/__init__.cpython-314.pyc b/engine/data/__pycache__/__init__.cpython-314.pyc new file mode 100644 index 0000000..bce53ef Binary files /dev/null and b/engine/data/__pycache__/__init__.cpython-314.pyc differ diff --git a/engine/data/__pycache__/db.cpython-314.pyc b/engine/data/__pycache__/db.cpython-314.pyc new file mode 100644 index 0000000..e111300 Binary files /dev/null and b/engine/data/__pycache__/db.cpython-314.pyc differ diff --git a/engine/data/__pycache__/models.cpython-314.pyc b/engine/data/__pycache__/models.cpython-314.pyc new file mode 100644 index 0000000..c0881b9 Binary files /dev/null and b/engine/data/__pycache__/models.cpython-314.pyc differ diff --git a/engine/data/__pycache__/reader.cpython-314.pyc b/engine/data/__pycache__/reader.cpython-314.pyc new file mode 100644 index 0000000..5f69428 Binary files /dev/null and b/engine/data/__pycache__/reader.cpython-314.pyc differ diff --git a/engine/data/db.py b/engine/data/db.py new file mode 100644 index 0000000..8816d7c --- /dev/null +++ b/engine/data/db.py @@ -0,0 +1,28 @@ +"""asyncpg 连接池管理 —— engine 模块对 TimescaleDB 的唯一切入点。""" + +import asyncpg +from engine.config.settings import DB_DSN, db as db_config + +_pool: asyncpg.Pool | None = None + + +async def get_pool() -> asyncpg.Pool: + """获取或创建 asyncpg 连接池(懒初始化,复用)。""" + global _pool + if _pool is None: + _pool = await asyncpg.create_pool( + dsn=DB_DSN, + min_size=2, + max_size=10, + command_timeout=30, + server_settings={"default_transaction_read_only": "on"}, + ) + return _pool + + +async def close_pool() -> None: + """关闭连接池(应用退出时调用)""" + global _pool + if _pool: + await _pool.close() + _pool = None diff --git a/engine/data/models.py b/engine/data/models.py new file mode 100644 index 0000000..0f4714e --- /dev/null +++ b/engine/data/models.py @@ -0,0 +1,100 @@ +""" +Python 数据模型 —— 镜像 data/db/entities/ 中的 TypeORM 实体。 + +命名约定: + - 类名与 TypeORM 实体类名一致(Kline, Exchange, TradingPair) + - 字段名使用 Python snake_case,对应 TypeORM 的 camelCase / snake_case + - 所有模型使用 Pydantic,提供运行时校验 + IDE 智能提示 +""" + +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel, Field + + +class KlineRecord(BaseModel): + """ + K 线数据记录。 + + 映射关系: + TypeORM Kline.time → KlineRecord.time + TypeORM Kline.exchange → KlineRecord.exchange + TypeORM Kline.symbol → KlineRecord.symbol + TypeORM Kline.interval → KlineRecord.interval + TypeORM Kline.open → KlineRecord.open + TypeORM Kline.high → KlineRecord.high + TypeORM Kline.low → KlineRecord.low + TypeORM Kline.close → KlineRecord.close + TypeORM Kline.volume → KlineRecord.volume + TypeORM Kline.quote_volume → KlineRecord.quote_volume + TypeORM Kline.taker_buy_base_vol → KlineRecord.taker_buy_base_vol + TypeORM Kline.taker_buy_quote_vol→ KlineRecord.taker_buy_quote_vol + TypeORM Kline.trade_count → KlineRecord.trade_count + TypeORM Kline.is_closed → KlineRecord.is_closed + """ + + time: datetime + exchange: str + symbol: str + interval: str + + open: float + high: float + low: float + close: float + volume: float + + quote_volume: Optional[float] = None + taker_buy_base_vol: Optional[float] = None + taker_buy_quote_vol: Optional[float] = None + trade_count: Optional[int] = None + is_closed: bool = True + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + + @classmethod + def from_record(cls, record) -> "KlineRecord": + return cls( + time=record["time"], + exchange=record["exchange"], + symbol=record["symbol"], + interval=record["interval"], + open=float(record["open"]), + high=float(record["high"]), + low=float(record["low"]), + close=float(record["close"]), + volume=float(record["volume"]), + quote_volume=float(record["quote_volume"]) if record["quote_volume"] is not None else None, + taker_buy_base_vol=float(record["taker_buy_base_vol"]) if record["taker_buy_base_vol"] is not None else None, + taker_buy_quote_vol=float(record["taker_buy_quote_vol"]) if record["taker_buy_quote_vol"] is not None else None, + trade_count=int(record["trade_count"]) if record["trade_count"] is not None else None, + is_closed=bool(record["is_closed"]), + created_at=record["created_at"] if "created_at" in record else None, + updated_at=record["updated_at"] if "updated_at" in record else None, + ) + + +class TradingPairInfo(BaseModel): + """交易对配置信息(轻量版,仅包含策略决策所需的字段)""" + + symbol: str + exchange: str + base_asset: str + quote_asset: str + price_precision: int + quantity_precision: int + min_qty: Optional[float] = None + min_notional: Optional[float] = None + active: bool = True + + +class Signal(BaseModel): + """交易信号""" + + symbol: str + signal_type: str = Field(..., pattern=r"^(BUY|SELL|HOLD)$") + price: Optional[float] = None + quantity: Optional[float] = None + reason: str = "" + timestamp: datetime = Field(default_factory=datetime.utcnow) diff --git a/engine/data/reader.py b/engine/data/reader.py new file mode 100644 index 0000000..301409d --- /dev/null +++ b/engine/data/reader.py @@ -0,0 +1,278 @@ +""" +K 线数据读取器 —— 从 TimescaleDB 查询历史 K 线供策略分析和回测使用。 + +所有方法均为只读查询,对应 data 模块 Kline 实体的字段结构。 +参考:data/db/entities/kline.entity.ts +""" + +from datetime import datetime +from typing import Optional, Sequence + +import asyncpg + +from engine.data.db import get_pool +from engine.data.models import KlineRecord + + +class KlineReader: + """TimescaleDB K 线只读查询器""" + + def __init__(self): + self._pool: Optional[asyncpg.Pool] = None + + async def _ensure_pool(self) -> asyncpg.Pool: + if self._pool is None: + self._pool = await get_pool() + return self._pool + + async def get_klines( + self, + symbol: str, + interval: str, + start_time: datetime, + end_time: datetime, + exchange: str = "binance", + limit: int = 1000, + ) -> list[KlineRecord]: + """ + 查询指定时间范围内的 K 线数据,支持任意周期(1m / 5m / 15m / 1h / 4h / 1d 等)。 + + Args: + symbol: 交易对(如 BTCUSDT) + interval: K 线周期(1m / 5m / 15m / 30m / 1h / 4h / 1d 等) + start_time: 起始时间(含) + end_time: 结束时间(含) + exchange: 交易所标识 + limit: 最大返回条数 + """ + pool = await self._ensure_pool() + + query = """ + SELECT + time, exchange, symbol, interval, + open, high, low, close, volume, + quote_volume, taker_buy_base_vol, taker_buy_quote_vol, + trade_count, is_closed, created_at, updated_at + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + AND time >= $4 + AND time <= $5 + ORDER BY time ASC + LIMIT $6 + """ + + rows: Sequence[asyncpg.Record] = await pool.fetch( + query, + exchange, + symbol, + interval, + start_time, + end_time, + limit, + ) + + return [KlineRecord.from_record(row) for row in rows] + + async def get_latest_klines( + self, + symbol: str, + interval: str, + exchange: str = "binance", + limit: int = 500, + ) -> list[KlineRecord]: + """ + 获取最近 N 根已闭合的 K 线(策略启动时快速预热)。 + + 仅查询 is_closed = TRUE 的 K 线,避免使用未闭合的不完整数据。 + """ + pool = await self._ensure_pool() + + query = """ + SELECT + time, exchange, symbol, interval, + open, high, low, close, volume, + quote_volume, taker_buy_base_vol, taker_buy_quote_vol, + trade_count, is_closed, created_at, updated_at + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + AND is_closed = TRUE + ORDER BY time DESC + LIMIT $4 + """ + + rows = await pool.fetch(query, exchange, symbol, interval, limit) + + records = [KlineRecord.from_record(row) for row in rows] + records.reverse() + return records + + async def get_klines_by_count( + self, + symbol: str, + interval: str, + count: int = 100, + exchange: str = "binance", + before_time: Optional[datetime] = None, + ) -> list[KlineRecord]: + """ + 获取最新的 N 根 K 线(按时间倒序取 N 条再正序返回)。 + + 适合策略运行时获取最近 N 根 K 线做指标计算,不关心特定时间范围。 + + Args: + symbol: 交易对 + interval: K 线周期 + count: 需要获取的 K 线数量 + exchange: 交易所标识 + before_time: 可选,只获取该时间之前的 K 线 + """ + pool = await self._ensure_pool() + + if before_time: + query = """ + SELECT + time, exchange, symbol, interval, + open, high, low, close, volume, + quote_volume, taker_buy_base_vol, taker_buy_quote_vol, + trade_count, is_closed, created_at, updated_at + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + AND time <= $4 + ORDER BY time DESC + LIMIT $5 + """ + rows = await pool.fetch(query, exchange, symbol, interval, before_time, count) + else: + query = """ + SELECT + time, exchange, symbol, interval, + open, high, low, close, volume, + quote_volume, taker_buy_base_vol, taker_buy_quote_vol, + trade_count, is_closed, created_at, updated_at + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + ORDER BY time DESC + LIMIT $4 + """ + rows = await pool.fetch(query, exchange, symbol, interval, count) + + records = [KlineRecord.from_record(row) for row in rows] + records.reverse() + return records + + async def get_ohlcv_array( + self, + symbol: str, + interval: str, + exchange: str = "binance", + limit: int = 200, + before_time: Optional[datetime] = None, + ) -> list[tuple[datetime, float, float, float, float, float]]: + """ + 获取 OHLCV 元组数组,直接用于 pandas DataFrame 或 TA-Lib 计算。 + + 返回格式: [(timestamp, open, high, low, close, volume), ...] + 时间升序排列。 + """ + pool = await self._ensure_pool() + + if before_time: + query = """ + SELECT time, open, high, low, close, volume + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + AND time <= $4 + ORDER BY time DESC + LIMIT $5 + """ + rows = await pool.fetch(query, exchange, symbol, interval, before_time, limit) + else: + query = """ + SELECT time, open, high, low, close, volume + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + ORDER BY time DESC + LIMIT $4 + """ + rows = await pool.fetch(query, exchange, symbol, interval, limit) + + return [ + (row["time"], float(row["open"]), float(row["high"]), float(row["low"]), float(row["close"]), float(row["volume"])) + for row in reversed(rows) + ] + + async def get_available_symbols( + self, + exchange: str = "binance", + ) -> list[str]: + """ + 查询已激活的交易对列表。 + + 从 trading_pairs 配置表读取(仅 active=TRUE 的记录), + 而非扫描 klines 时序表,避免全表扫描。 + 对应 data/db/entities/trading-pair.entity.ts。 + """ + pool = await self._ensure_pool() + + rows = await pool.fetch( + """ + SELECT tp.symbol + FROM trading_pairs tp + JOIN exchanges e ON tp.exchange_id = e.id + WHERE e.name = $1 + AND tp.active = TRUE + ORDER BY tp.symbol + """, + exchange, + ) + + return [row["symbol"] for row in rows] + + async def get_available_intervals( + self, + symbol: str, + exchange: str = "binance", + ) -> list[str]: + """ + 查询某交易对配置的 K 线周期列表。 + + 从 trading_pairs.kline_intervals 读取(逗号分隔字符串), + 而非扫描 klines 时序表,避免全表扫描。 + 对应 data/db/entities/trading-pair.entity.ts: kline_intervals 字段。 + """ + pool = await self._ensure_pool() + + row = await pool.fetchrow( + """ + SELECT tp.kline_intervals + FROM trading_pairs tp + JOIN exchanges e ON tp.exchange_id = e.id + WHERE e.name = $1 + AND tp.symbol = $2 + AND tp.active = TRUE + """, + exchange, + symbol, + ) + + if row is None or not row["kline_intervals"]: + return [] + + # kline_intervals 格式: "1m,5m,15m,1h,4h,1d" + return [ + interval.strip() + for interval in row["kline_intervals"].split(",") + if interval.strip() + ] diff --git a/engine/example/__init__.py b/engine/example/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/engine/example/__pycache__/__init__.cpython-314.pyc b/engine/example/__pycache__/__init__.cpython-314.pyc new file mode 100644 index 0000000..f7a3d9b Binary files /dev/null and b/engine/example/__pycache__/__init__.cpython-314.pyc differ diff --git a/engine/example/__pycache__/test_db.cpython-314.pyc b/engine/example/__pycache__/test_db.cpython-314.pyc new file mode 100644 index 0000000..87e0f25 Binary files /dev/null and b/engine/example/__pycache__/test_db.cpython-314.pyc differ diff --git a/engine/example/test_db.py b/engine/example/test_db.py new file mode 100644 index 0000000..2d9f80b --- /dev/null +++ b/engine/example/test_db.py @@ -0,0 +1,44 @@ +""" +数据库模块测试示例。 + +运行方式(在项目根目录 trade/ 下): + python -m engine.example.test_db + +前提条件: + docker compose up -d + data 模块已同步过 K 线数据到 TimescaleDB +""" + +import asyncio +from datetime import datetime, timedelta, timezone + +from engine.config import print_config_summary +from engine.data import KlineRecord, KlineReader, close_pool + +async def main(): + print("=" * 60) + print(" Trade Engine — 数据库模块测试") + print("=" * 60) + + print_config_summary() + + reader = KlineReader() + + # 获取最近 100 根 BTCUSDT 5m K 线 + klines = await reader.get_klines_by_count( + symbol="BTCUSDT", + interval="1m", + count=5, + ) + print(f"\n查询到 {len(klines)} 根 K 线:") + for k in klines[:5]: + print(f" {k.time.isoformat()} | O={k.open:.2f} H={k.high:.2f} " + f"L={k.low:.2f} C={k.close:.2f} V={k.volume:.4f}") + if len(klines) > 5: + print(f" ... 剩余 {len(klines) - 5} 根") + + await close_pool() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/engine/pyproject.toml b/engine/pyproject.toml new file mode 100644 index 0000000..dd3fbca --- /dev/null +++ b/engine/pyproject.toml @@ -0,0 +1,14 @@ +[project] +name = "trade-engine" +version = "0.1.0" +description = "量化交易策略引擎" +requires-python = ">=3.10" +dependencies = [ + "asyncpg>=0.29", + "pydantic>=2.5", + "pyyaml>=6.0", +] + +[tool.pyright] +venvPath = "." +venv = ".venv"