diff --git a/engine/README.md b/engine/README.md deleted file mode 100644 index a219ba9..0000000 --- a/engine/README.md +++ /dev/null @@ -1,1152 +0,0 @@ -# Engine Module — Python 策略引擎架构文档 - -> **模块定位**:数字货币量化交易系统的业务层,负责策略开发、回测分析、信号生成,从 TimescaleDB 读取 data 模块持久化的 K 线数据进行策略决策。 -> -> **运行时**:Python 3.10+ | **依赖管理**:Poetry / uv | **数据库驱动**:asyncpg | **配置解析**:PyYAML - ---- - -## 目录 - -1. [模块定位与架构边界](#1-模块定位与架构边界) -2. [目录结构](#2-目录结构) -3. [配置管理 — 读取 env.yaml](#3-配置管理--读取-envyaml) -4. [数据读取 — TimescaleDB K 线查询](#4-数据读取--timescaledb-k-线查询) -5. [实体映射 — Python ↔ TypeORM 类型对齐](#5-实体映射--python--typeorm-类型对齐) -6. [策略基类设计](#6-策略基类设计) -7. [回测引擎设计](#7-回测引擎设计) -8. [最小可运行示例](#8-最小可运行示例) -9. [性能考量](#9-性能考量) -10. [风险提示](#10-风险提示) - ---- - -## 1. 模块定位与架构边界 - -### 1.1 系统分层中的位置 - -``` -┌──────────────────────────────────────────────────────────────┐ -│ 🐍 Python Engine (本模块) │ -│ │ -│ ┌──────────────┐ ┌──────────────┐ ┌───────────────────┐ │ -│ │ 策略引擎 │ │ 回测引擎 │ │ 信号分发器 │ │ -│ │ (strategy/) │ │ (backtest/) │ │ (signals/) │ │ -│ └──────┬───────┘ └──────┬───────┘ └────────┬──────────┘ │ -│ │ │ │ │ -│ └─────────────────┼────────────────────┘ │ -│ │ │ -│ ┌────────────────────────▼──────────────────────────────┐ │ -│ │ 数据访问层 (data/) │ │ -│ │ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ -│ │ │ TimescaleDB │ │ env.yaml │ │ Redis │ │ │ -│ │ │ Reader │ │ Config │ │ Subscriber │ │ │ -│ │ └─────────────┘ └──────────────┘ └──────────────┘ │ │ -│ └───────────────────────────────────────────────────────┘ │ -└──────────────────────────┬───────────────────────────────────┘ - │ - ┌──────────────────┼──────────────────┐ - │ │ │ - ▼ ▼ ▼ -┌──────────────┐ ┌──────────────┐ ┌──────────────┐ -│ TimescaleDB │ │ env.yaml │ │ Redis │ -│ (data 模块 │ │ (项目根目录) │ │ (行情消息) │ -│ 写入的K线) │ │ │ │ │ -└──────────────┘ └──────────────┘ └──────────────┘ -``` - -### 1.2 与 data 模块的关系 - -| 维度 | data 模块 (TypeScript) | engine 模块 (Python) | -|------|----------------------|---------------------| -| **职责** | 行情采集、K 线合成、数据持久化 | 策略计算、回测、信号生成 | -| **数据库操作** | **写入**(UPSERT K 线到 TimescaleDB) | **只读**(SELECT K 线进行策略分析) | -| **配置来源** | `data/config/index.ts` 读取 `env.yaml` | `engine/config.py` 读取同一份 `env.yaml` | -| **实体定义** | TypeORM 装饰器实体 (`data/db/entities/`) | Pydantic/dataclass 模型(镜像 TypeORM 结构) | -| **运行时** | Bun / Node.js | CPython 3.10+ | - -> **设计原则**:engine 模块对 TimescaleDB **只读**,绝不写入。K 线数据的写入由 data 模块全权负责,避免双端写入导致的数据一致性问题。 - ---- - -## 2. 目录结构 - -``` -engine/ -├── README.md # 本架构文档 -├── pyproject.toml # Python 项目依赖(Poetry) -├── __init__.py -│ -├── config/ # 配置管理 -│ ├── __init__.py -│ └── settings.py # 读取 ../../env.yaml,Pydantic 校验 -│ -├── data/ # 数据访问层(只读 TimescaleDB) -│ ├── __init__.py -│ ├── db.py # asyncpg 连接池管理 -│ ├── reader.py # K 线查询方法封装 -│ └── models.py # Python 数据模型(镜像 TypeORM 实体) -│ -├── strategy/ # 策略实现 -│ ├── __init__.py -│ ├── base.py # BaseStrategy 抽象基类 -│ ├── ma_cross.py # 双均线交叉策略示例 -│ └── grid.py # 网格交易策略示例 -│ -├── backtest/ # 回测引擎 -│ ├── __init__.py -│ ├── engine.py # 事件驱动回测引擎 -│ └── metrics.py # 绩效指标计算(夏普比率、最大回撤等) -│ -├── signals/ # 信号分发 -│ ├── __init__.py -│ └── dispatcher.py # 信号生成与分发 -│ -├── common/ # 公共工具 -│ ├── __init__.py -│ ├── logger.py # loguru 日志配置 -│ └── utils.py # 时间工具、重试装饰器等 -│ -└── tests/ # 单元测试 - ├── __init__.py - ├── test_config.py - ├── test_reader.py - └── test_strategy.py -``` - ---- - -## 3. 配置管理 — 读取 env.yaml - -### 3.1 设计思路 - -engine 模块与 data 模块共享项目根目录的 `env.yaml`。Python 侧通过 `PyYAML` 解析,`Pydantic` 进行运行时校验——这与 TypeScript 侧 `data/config/index.ts` 使用 `yaml` + `zod` 的模式完全对称。 - -### 3.2 配置文件结构回顾 - -`env.yaml`(项目根目录,data 与 engine 共享): - -```yaml -# --- TimescaleDB / PostgreSQL 连接 --- -db: - host: localhost - port: 5432 - name: trade - user: trader - password: fucketh - -# --- Redis 连接 --- -redis: - url: redis://localhost:6379 - publish_enabled: true - -# --- 日志 --- -logging: - level: debug - node_env: development -``` - -### 3.3 Python 实现 - -```python -# engine/config/settings.py -"""中心化配置模块 —— 读取项目根目录 env.yaml,Pydantic 校验导出强类型配置对象。""" - -from pathlib import Path -from typing import Literal - -import yaml -from pydantic import BaseModel, Field, field_validator - - -# ============================================================ -# 1. Pydantic 校验模型(镜像 TypeScript config/validators.ts) -# ============================================================ - -class DbConfig(BaseModel): - """TimescaleDB / PostgreSQL 连接参数""" - host: str = "localhost" - port: int = Field(default=5432, ge=1, le=65535) - name: str # 对应 YAML 中的 name - user: str - password: str - - -class RedisConfig(BaseModel): - """Redis 连接配置""" - url: str = "redis://localhost:6379" - publish_enabled: bool = True - - -class LoggingConfig(BaseModel): - """日志配置""" - level: Literal["trace", "debug", "info", "warn", "error", "fatal"] = "info" - node_env: Literal["development", "production"] = "development" - - -class EnvConfig(BaseModel): - """env.yaml 顶层结构""" - db: DbConfig - redis: RedisConfig - logging: LoggingConfig - - -# ============================================================ -# 2. 定位并加载 env.yaml -# ============================================================ - -def _get_project_root() -> Path: - """ - 计算项目根目录的绝对路径。 - engine/config/settings.py → engine/config/ → engine/ → / - """ - return Path(__file__).resolve().parent.parent.parent - - -def _load_yaml_config() -> dict: - """读取项目根目录 env.yaml,返回原始字典。文件不存在时抛出明确错误。""" - root = _get_project_root() - yaml_path = root / "env.yaml" - - if not yaml_path.exists(): - raise FileNotFoundError( - f"[config] 无法读取配置文件: {yaml_path}\n" - f"请确保项目根目录存在 env.yaml。" - ) - - with open(yaml_path, "r", encoding="utf-8") as f: - data = yaml.safe_load(f) - - if data is None: - raise ValueError(f"[config] env.yaml 解析结果为空: {yaml_path}") - - return data - - -# ============================================================ -# 3. 加载 & 校验 & 导出 -# ============================================================ - -_raw = _load_yaml_config() -_config = EnvConfig.model_validate(_raw) - -# --- 按职责分组的导出配置对象 --- - -# TimescaleDB 连接参数(可直接用于 asyncpg.create_pool) -db = _config.db - -# Redis 连接参数 -redis = _config.redis - -# 日志参数 -logging = _config.logging - -# --- asyncpg DSN(便捷构造)--- -DB_DSN = ( - f"postgresql://{db.user}:{db.password}@{db.host}:{db.port}/{db.name}" -) - - -def print_config_summary() -> None: - """打印脱敏后的配置概要(不含密码明文)""" - print(f"[config] TimescaleDB: {db.user}@{db.host}:{db.port}/{db.name}") - print(f"[config] Redis: {redis.url.replace('//', '//***@') if '@' in redis.url else redis.url}") - print(f"[config] Logging: level={logging.level}, env={logging.node_env}") -``` - -### 3.4 与 TypeScript 侧对比 - -| 步骤 | TypeScript (`data/config/index.ts`) | Python (`engine/config/settings.py`) | -|------|--------------------------------------|--------------------------------------| -| **路径计算** | `fileURLToPath(import.meta.url)` → `../..` | `Path(__file__).resolve().parent.parent.parent` | -| **YAML 解析** | `parse()` from `yaml` npm 包 | `yaml.safe_load()` from PyYAML | -| **类型校验** | `validateConfig()` 使用 `zod` | `EnvConfig.model_validate()` 使用 Pydantic | -| **导出方式** | 分组 `const` 对象 (`pgsql`, `redis`, `logging`) | 分组模块级变量 (`db`, `redis`, `logging`) | -| **DSN 构造** | 不构造,由 TypeORM DataSource 使用拆解字段 | 额外导出 `DB_DSN` 供 asyncpg 使用 | - ---- - -## 4. 数据读取 — TimescaleDB K 线查询 - -### 4.1 连接池管理 - -```python -# engine/data/db.py -"""asyncpg 连接池管理 —— engine 模块对 TimescaleDB 的唯一切入点。""" - -import asyncpg -from engine.config.settings import db as db_config, DB_DSN - -# 模块级连接池(单例) -_pool: asyncpg.Pool | None = None - - -async def get_pool() -> asyncpg.Pool: - """获取或创建 asyncpg 连接池(懒初始化,复用)。""" - global _pool - if _pool is None: - _pool = await asyncpg.create_pool( - dsn=DB_DSN, - min_size=2, - max_size=10, - command_timeout=30, # 查询超时 30 秒 - # 只读模式:在连接级别设置,防止误写 - server_settings={"default_transaction_read_only": "on"}, - ) - return _pool - - -async def close_pool() -> None: - """关闭连接池(应用退出时调用)""" - global _pool - if _pool: - await _pool.close() - _pool = None -``` - -### 4.2 K 线读取器 - -```python -# engine/data/reader.py -""" -K 线数据读取器 —— 从 TimescaleDB 查询历史 K 线供策略分析和回测使用。 - -所有方法均为只读查询,对应 data 模块 Kline 实体的字段结构。 -参考:data/db/entities/kline.entity.ts -""" - -from datetime import datetime -from typing import Optional, Sequence - -import asyncpg - -from engine.data.db import get_pool -from engine.data.models import KlineRecord - - -class KlineReader: - """TimescaleDB K 线只读查询器""" - - def __init__(self): - self._pool: Optional[asyncpg.Pool] = None - - async def _ensure_pool(self) -> asyncpg.Pool: - if self._pool is None: - self._pool = await get_pool() - return self._pool - - # ---------------------------------------------------------- - # 核心查询方法 - # ---------------------------------------------------------- - - async def get_klines( - self, - symbol: str, - interval: str, - start_time: datetime, - end_time: datetime, - exchange: str = "binance", - limit: int = 1000, - ) -> list[KlineRecord]: - """ - 查询指定时间范围内的 K 线数据。 - - 字段映射 — data/db/entities/kline.entity.ts: - time → Kline.time (timestamptz) - exchange → Kline.exchange (text) - symbol → Kline.symbol (text) - interval → Kline.interval (text) - open → Kline.open (numeric) - high → Kline.high (numeric) - low → Kline.low (numeric) - close → Kline.close (numeric) - volume → Kline.volume (numeric) - quote_volume → Kline.quote_volume (nullable) - trade_count → Kline.trade_count (nullable) - is_closed → Kline.is_closed (boolean) - """ - pool = await self._ensure_pool() - - query = """ - SELECT - time, - exchange, - symbol, - interval, - open, - high, - low, - close, - volume, - quote_volume, - taker_buy_base_vol, - taker_buy_quote_vol, - trade_count, - is_closed - FROM klines - WHERE exchange = $1 - AND symbol = $2 - AND interval = $3 - AND time >= $4 - AND time <= $5 - ORDER BY time ASC - LIMIT $6 - """ - - rows: Sequence[asyncpg.Record] = await pool.fetch( - query, - exchange, - symbol, - interval, - start_time, - end_time, - limit, - ) - - return [KlineRecord.from_record(row) for row in rows] - - async def get_latest_klines( - self, - symbol: str, - interval: str, - exchange: str = "binance", - limit: int = 500, - ) -> list[KlineRecord]: - """ - 获取最近 N 根已闭合的 K 线(策略启动时快速预热)。 - - 仅查询 is_closed = TRUE 的 K 线,避免使用未闭合的不完整数据。 - """ - pool = await self._ensure_pool() - - query = """ - SELECT - time, exchange, symbol, interval, - open, high, low, close, volume, - quote_volume, taker_buy_base_vol, taker_buy_quote_vol, - trade_count, is_closed - FROM klines - WHERE exchange = $1 - AND symbol = $2 - AND interval = $3 - AND is_closed = TRUE - ORDER BY time DESC - LIMIT $4 - """ - - rows = await pool.fetch(query, exchange, symbol, interval, limit) - - # 正序返回(时间升序) - records = [KlineRecord.from_record(row) for row in rows] - records.reverse() - return records - - async def get_ohlcv_array( - self, - symbol: str, - interval: str, - exchange: str = "binance", - limit: int = 200, - ) -> list[tuple[datetime, float, float, float, float, float]]: - """ - 获取 OHLCV 元组数组,直接用于 pandas DataFrame 或 TA-Lib 计算。 - - 返回格式: [(timestamp, open, high, low, close, volume), ...] - 时间升序排列。 - """ - pool = await self._ensure_pool() - - query = """ - SELECT time, open, high, low, close, volume - FROM klines - WHERE exchange = $1 - AND symbol = $2 - AND interval = $3 - ORDER BY time DESC - LIMIT $4 - """ - - rows = await pool.fetch(query, exchange, symbol, interval, limit) - - return [ - (row["time"], row["open"], row["high"], row["low"], row["close"], row["volume"]) - for row in reversed(rows) - ] - - async def get_available_symbols( - self, - exchange: str = "binance", - ) -> list[str]: - """查询 TimescaleDB 中有 K 线数据的交易对列表。""" - pool = await self._ensure_pool() - - rows = await pool.fetch( - """ - SELECT DISTINCT symbol - FROM klines - WHERE exchange = $1 - ORDER BY symbol - """, - exchange, - ) - - return [row["symbol"] for row in rows] -``` - -### 4.3 查询模式与索引对齐 - -Kline 实体的复合主键 `(exchange, symbol, interval, time)` 决定了最常用的查询模式。以下查询均能命中主键索引: - -| 查询场景 | SQL 条件 | 索引命中 | -|---------|---------|---------| -| 回测:某交易对某周期的时间范围 | `WHERE exchange=$1 AND symbol=$2 AND interval=$3 AND time>=$4 AND time<=$5 ORDER BY time` | ✅ 主键索引前导列精确匹配 | -| 最新 K 线:已闭合的最近 N 根 | `WHERE exchange=$1 AND symbol=$2 AND interval=$3 AND is_closed=TRUE ORDER BY time DESC LIMIT N` | ✅ 主键索引 + 顺序扫描(LIMIT 小) | -| 可用交易对枚举 | `SELECT DISTINCT symbol WHERE exchange=$1` | ⚠️ 全表扫描 — 建议从 `trading_pairs` 表获取 | - -> **性能建议**:`get_available_symbols()` 应优先查询 `trading_pairs` 表(关系数据,TypeORM 管理域),而非扫描 `klines` 时序表。 - ---- - -## 5. 实体映射 — Python ↔ TypeORM 类型对齐 - -### 5.1 映射策略 - -Python 侧的数据模型镜像 TypeScript 侧 TypeORM 实体,保证双端数据结构一致性。映射规则: - -``` -TypeScript (TypeORM Entity) → Python (Pydantic Model) -════════════════════════════ ════════════════════════ -@PrimaryColumn() timestamptz → datetime -@Column("text") → str -@Column("numeric", 20, 8) → float (Python float = C double, 足够 20 位精度) -@Column("boolean") → bool -@Column("integer") → int -@Column({ nullable: true }) → Optional[...] -``` - -### 5.2 Python 数据模型 - -```python -# engine/data/models.py -""" -Python 数据模型 —— 镜像 data/db/entities/ 中的 TypeORM 实体。 - -命名约定: - - 类名与 TypeORM 实体类名一致(Kline, Exchange, TradingPair) - - 字段名使用 Python snake_case,对应 TypeORM 的 camelCase / snake_case - - 所有模型使用 Pydantic,提供运行时校验 + IDE 智能提示 -""" - -from datetime import datetime -from typing import Optional - -from pydantic import BaseModel, Field - - -# ============================================================ -# Kline 模型 — 镜像 data/db/entities/kline.entity.ts -# ============================================================ - -class KlineRecord(BaseModel): - """ - K 线数据记录。 - - 映射关系: - TypeORM Kline.time → KlineRecord.time - TypeORM Kline.exchange → KlineRecord.exchange - TypeORM Kline.symbol → KlineRecord.symbol - TypeORM Kline.interval → KlineRecord.interval - TypeORM Kline.open → KlineRecord.open - TypeORM Kline.high → KlineRecord.high - TypeORM Kline.low → KlineRecord.low - TypeORM Kline.close → KlineRecord.close - TypeORM Kline.volume → KlineRecord.volume - TypeORM Kline.quote_volume → KlineRecord.quote_volume - TypeORM Kline.taker_buy_base_vol → KlineRecord.taker_buy_base_vol - TypeORM Kline.taker_buy_quote_vol→ KlineRecord.taker_buy_quote_vol - TypeORM Kline.trade_count → KlineRecord.trade_count - TypeORM Kline.is_closed → KlineRecord.is_closed - """ - - time: datetime - exchange: str - symbol: str - interval: str - - # OHLCV - open: float - high: float - low: float - close: float - volume: float - - # 扩展字段(nullable → Optional) - quote_volume: Optional[float] = None - taker_buy_base_vol: Optional[float] = None - taker_buy_quote_vol: Optional[float] = None - trade_count: Optional[int] = None - - # 状态 - is_closed: bool = True - - @classmethod - def from_record(cls, record) -> "KlineRecord": - """ - 从 asyncpg.Record 构造 KlineRecord 实例。 - - asyncpg 返回的 Record 对象行为类似 dict, - 数值列自动转为 Python float/int,timestamptz 自动转为 datetime。 - """ - return cls( - time=record["time"], - exchange=record["exchange"], - symbol=record["symbol"], - interval=record["interval"], - open=float(record["open"]), - high=float(record["high"]), - low=float(record["low"]), - close=float(record["close"]), - volume=float(record["volume"]), - quote_volume=float(record["quote_volume"]) if record["quote_volume"] is not None else None, - taker_buy_base_vol=float(record["taker_buy_base_vol"]) if record["taker_buy_base_vol"] is not None else None, - taker_buy_quote_vol=float(record["taker_buy_quote_vol"]) if record["taker_buy_quote_vol"] is not None else None, - trade_count=int(record["trade_count"]) if record["trade_count"] is not None else None, - is_closed=bool(record["is_closed"]), - ) - - -# ============================================================ -# TradingPair 模型 — 镜像 data/db/entities/trading-pair.entity.ts -# ============================================================ - -class TradingPairInfo(BaseModel): - """ - 交易对配置信息(轻量版,仅包含策略决策所需的字段)。 - - 完整实体参考 data/db/entities/trading-pair.entity.ts - """ - symbol: str - exchange: str - base_asset: str - quote_asset: str - price_precision: int - quantity_precision: int - min_qty: Optional[float] = None - min_notional: Optional[float] = None - active: bool = True - - -# ============================================================ -# 策略信号模型 -# ============================================================ - -class Signal(BaseModel): - """ - 交易信号。 - - 策略 generate_signal() 方法的返回值, - 由 BaseStrategy.do_action() 消费执行。 - """ - symbol: str - signal_type: str = Field(..., pattern=r"^(BUY|SELL|HOLD)$") - price: Optional[float] = None # 限价单价格(None = 市价单) - quantity: Optional[float] = None # 下单数量(None = 使用风控模块计算的默认值) - reason: str = "" # 信号产生原因(便于日志审计) - timestamp: datetime = Field(default_factory=datetime.utcnow) -``` - -### 5.3 字段对应速查表 - -| TypeORM Entity (`data/db/entities/kline.entity.ts`) | Python Model (`engine/data/models.py`) | DB 列类型 | Python 类型 | -|-----------------------------------------------------|----------------------------------------|----------|------------| -| `time` (PrimaryColumn, timestamptz) | `KlineRecord.time` | `TIMESTAMPTZ` | `datetime` | -| `exchange` (PrimaryColumn, text) | `KlineRecord.exchange` | `TEXT` | `str` | -| `symbol` (PrimaryColumn, text) | `KlineRecord.symbol` | `TEXT` | `str` | -| `interval` (PrimaryColumn, text) | `KlineRecord.interval` | `TEXT` | `str` | -| `open` (numeric 20,8) | `KlineRecord.open` | `NUMERIC(20,8)` | `float` | -| `high` (numeric 20,8) | `KlineRecord.high` | `NUMERIC(20,8)` | `float` | -| `low` (numeric 20,8) | `KlineRecord.low` | `NUMERIC(20,8)` | `float` | -| `close` (numeric 20,8) | `KlineRecord.close` | `NUMERIC(20,8)` | `float` | -| `volume` (numeric 20,8) | `KlineRecord.volume` | `NUMERIC(20,8)` | `float` | -| `quote_volume` (nullable) | `KlineRecord.quote_volume` | `NUMERIC(20,8)` | `Optional[float]` | -| `taker_buy_base_vol` (nullable) | `KlineRecord.taker_buy_base_vol` | `NUMERIC(20,8)` | `Optional[float]` | -| `taker_buy_quote_vol` (nullable) | `KlineRecord.taker_buy_quote_vol` | `NUMERIC(20,8)` | `Optional[float]` | -| `trade_count` (nullable) | `KlineRecord.trade_count` | `INTEGER` | `Optional[int]` | -| `is_closed` | `KlineRecord.is_closed` | `BOOLEAN` | `bool` | - ---- - -## 6. 策略基类设计 - -```python -# engine/strategy/base.py -"""策略抽象基类 —— 所有交易策略的父类。""" - -from abc import ABC, abstractmethod -from datetime import datetime -from typing import Optional - -from engine.data.models import KlineRecord, Signal - - -class BaseStrategy(ABC): - """ - 策略基类。 - - 子类必须实现: - - generate_signal(): 基于 K 线数据生成交易信号 - - on_kline(): K 线到达时的回调(可在此更新内部状态) - - 生命周期: - 1. __init__() — 初始化策略参数和内状态 - 2. on_kline() — 每根新 K 线到达时调用(更新内部指标状态) - 3. generate_signal() — 被回测引擎/实盘调度器调用,生成买卖信号 - """ - - def __init__(self, symbol: str, interval: str, **kwargs): - self.symbol = symbol - self.interval = interval - self._klines: list[KlineRecord] = [] # 内部 K 线缓存 - self._last_signal: Optional[Signal] = None - - @abstractmethod - async def generate_signal(self) -> Optional[Signal]: - """ - 基于当前缓存的 K 线数据生成交易信号。 - - Returns: - Signal(BUY/SELL) — 触发交易 - None — 无信号,继续持仓/观望 - """ - ... - - async def on_kline(self, kline: KlineRecord) -> None: - """ - 新 K 线到达回调(默认实现:追加到缓存)。 - - 子类可重写以更新技术指标缓存(如 MA、RSI 中间计算结果)。 - """ - self._klines.append(kline) - - async def warm_up(self, klines: list[KlineRecord]) -> None: - """ - 策略预热:加载历史 K 线,初始化内部状态。 - - 回测引擎在开始回测前调用;实盘调度器在策略启动时调用。 - """ - self._klines = sorted(klines, key=lambda k: k.time) - - @property - def is_ready(self) -> bool: - """策略是否已就绪(缓存了足够的历史 K 线)""" - return len(self._klines) >= self.min_klines_required - - @property - @abstractmethod - def min_klines_required(self) -> int: - """策略正常运行所需的最少 K 线数量(如双均线策略需要 slow_period+1 根)""" - ... -``` - ---- - -## 7. 回测引擎设计 - -```python -# engine/backtest/engine.py -""" -事件驱动回测引擎。 - -核心流程: - 1. 从 TimescaleDB 加载历史 K 线 - 2. 按时间顺序逐根喂给策略 - 3. 收集信号 → 模拟成交 → 计算收益 - 4. 输出绩效报告(夏普比率、最大回撤、胜率等) -""" - -from dataclasses import dataclass, field -from datetime import datetime - -from engine.data.reader import KlineReader -from engine.data.models import KlineRecord, Signal -from engine.strategy.base import BaseStrategy - - -@dataclass -class BacktestResult: - """回测结果""" - symbol: str - interval: str - start_time: datetime - end_time: datetime - total_klines: int - total_signals: int - total_return: float # 总收益率 - annual_return: float # 年化收益率 - max_drawdown: float # 最大回撤 - sharpe_ratio: float # 夏普比率 - win_rate: float # 胜率 - profit_factor: float # 盈亏比 - equity_curve: list[float] = field(default_factory=list) # 权益曲线 - - -class BacktestEngine: - """回测引擎""" - - def __init__(self, reader: KlineReader): - self.reader = reader - - async def run( - self, - strategy: BaseStrategy, - symbol: str, - interval: str, - start_time: datetime, - end_time: datetime, - exchange: str = "binance", - ) -> BacktestResult: - """ - 执行回测。 - - Args: - strategy: 待回测的策略实例 - symbol: 交易对 - interval: K 线周期 - start_time: 回测起始时间 - end_time: 回测结束时间 - exchange: 交易所 - """ - # 1. 加载历史 K 线 - klines = await self.reader.get_klines( - symbol=symbol, - interval=interval, - start_time=start_time, - end_time=end_time, - exchange=exchange, - ) - - if len(klines) == 0: - raise ValueError(f"回测区间内无 K 线数据: {symbol} {interval} [{start_time}, {end_time}]") - - # 2. 策略预热 - await strategy.warm_up(klines) - - # 3. 逐根 K 线回放 - signals: list[Signal] = [] - for kline in klines: - await strategy.on_kline(kline) - if strategy.is_ready: - signal = await strategy.generate_signal() - if signal is not None: - signals.append(signal) - - # 4. 计算绩效指标 - return self._compute_metrics( - symbol=symbol, - interval=interval, - start_time=start_time, - end_time=end_time, - klines=klines, - signals=signals, - ) - - def _compute_metrics( - self, - symbol: str, - interval: str, - start_time: datetime, - end_time: datetime, - klines: list[KlineRecord], - signals: list[Signal], - ) -> BacktestResult: - """计算回测绩效指标(框架方法,实际计算委托 metrics.py)""" - from engine.backtest.metrics import compute_metrics - - return compute_metrics( - symbol=symbol, - interval=interval, - start_time=start_time, - end_time=end_time, - klines=klines, - signals=signals, - ) -``` - ---- - -## 8. 最小可运行示例 - -### 8.1 安装依赖 - -```bash -# engine/pyproject.toml -[tool.poetry.dependencies] -python = "^3.10" -asyncpg = "^0.29" -pydantic = "^2.5" -pyyaml = "^6.0" -loguru = "^0.7" -pandas = "^2.0" -numpy = "^1.26" - -[build-system] -requires = ["poetry-core"] -build-backend = "poetry.core.masonry.api" -``` - -```bash -cd engine -poetry install -``` - -### 8.2 运行示例 - -```python -# engine/__main__.py -""" -最小可运行示例 —— 验证 TimescaleDB 连接、配置读取、K 线查询。 - -运行方式: - cd engine - poetry run python -m engine -""" - -import asyncio -from datetime import datetime, timedelta, timezone - -from engine.config.settings import print_config_summary, db as db_config -from engine.data.db import get_pool, close_pool -from engine.data.reader import KlineReader - - -async def main(): - # 1. 打印配置概要(验证 env.yaml 读取) - print_config_summary() - - # 2. 测试 TimescaleDB 连接 - pool = await get_pool() - async with pool.acquire() as conn: - version = await conn.fetchval("SELECT version()") - print(f"[db] TimescaleDB 连接成功: {version}") - - # 3. 查询最近 10 根 BTCUSDT 1h K 线 - reader = KlineReader() - end = datetime.now(timezone.utc) - start = end - timedelta(hours=24) - - klines = await reader.get_klines( - symbol="BTCUSDT", - interval="1h", - start_time=start, - end_time=end, - exchange="binance", - limit=10, - ) - - print(f"\n[data] 查询到 {len(klines)} 根 K 线:") - for k in klines[:5]: - print(f" {k.time.isoformat()} | O={k.open:.2f} H={k.high:.2f} " - f"L={k.low:.2f} C={k.close:.2f} V={k.volume:.4f}") - - # 4. 清理 - await close_pool() - - -if __name__ == "__main__": - asyncio.run(main()) -``` - -### 8.3 完整启动流程 - -```python -# engine/run.py -"""策略引擎完整启动流程示例""" - -import asyncio -from datetime import datetime, timedelta, timezone - -from engine.config.settings import print_config_summary -from engine.data.db import get_pool, close_pool -from engine.data.reader import KlineReader -from engine.strategy.ma_cross import MACrossStrategy # 双均线策略示例 -from engine.backtest.engine import BacktestEngine - - -async def run_backtest(): - print_config_summary() - - reader = KlineReader() - engine = BacktestEngine(reader) - - # 策略参数 - strategy = MACrossStrategy( - symbol="BTCUSDT", - interval="1h", - fast_period=5, # 快线周期 - slow_period=20, # 慢线周期 - ) - - # 回测区间:最近 90 天 - end = datetime.now(timezone.utc) - start = end - timedelta(days=90) - - print(f"\n[backtest] 开始回测: {strategy.symbol} {strategy.interval} " - f"[{start.date()} ~ {end.date()}]") - - result = await engine.run( - strategy=strategy, - symbol="BTCUSDT", - interval="1h", - start_time=start, - end_time=end, - ) - - # 输出绩效 - print(f"\n{'='*50}") - print(f"回测绩效报告") - print(f"{'='*50}") - print(f" K 线总数: {result.total_klines}") - print(f" 交易信号数: {result.total_signals}") - print(f" 总收益率: {result.total_return:.2%}") - print(f" 年化收益率: {result.annual_return:.2%}") - print(f" 最大回撤: {result.max_drawdown:.2%}") - print(f" 夏普比率: {result.sharpe_ratio:.2f}") - print(f" 胜率: {result.win_rate:.2%}") - print(f" 盈亏比: {result.profit_factor:.2f}") - print(f"{'='*50}") - - await close_pool() - - -if __name__ == "__main__": - asyncio.run(run_backtest()) -``` - ---- - -## 9. 性能考量 - -### 9.1 查询优化 - -| 场景 | 优化策略 | 效果 | -|------|---------|------| -| **大数据量回测**(> 10 万根 K 线) | 使用 `asyncpg.Cursor` 流式读取,避免一次性加载到内存 | 内存峰值从 O(n) 降到 O(batch_size) | -| **多币种并发查询** | `asyncio.gather()` 并行查询多个 symbol | N 个币种查询时间 ≈ max(单币种时间),而非 sum | -| **重复区间查询** | 策略层缓存已查询的 K 线,避免重复 DB 查询 | 减少 90% 的重复 I/O | -| **高周期 K 线** | 若 TimescaleDB 配置了连续聚合视图,直接查询物化视图而非原始 1m 表 | 查询速度提升 10-100× | - -### 9.2 内存管理 - -```python -# 流式读取大数据量 K 线(避免 OOM) -async def get_klines_streaming( - pool: asyncpg.Pool, - symbol: str, - interval: str, - start_time: datetime, - end_time: datetime, - batch_size: int = 5000, -): - """流式读取 K 线,每次返回 batch_size 条,适合百万级回测。""" - async with pool.acquire() as conn: - async with conn.transaction(): - cursor = await conn.cursor( - """ - SELECT * FROM klines - WHERE exchange=$1 AND symbol=$2 AND interval=$3 - AND time>=$4 AND time<=$5 - ORDER BY time ASC - """ - ) - # 省略参数绑定细节... - async for row in cursor: - yield KlineRecord.from_record(row) -``` - -### 9.3 向量化计算建议 - -策略中的技术指标计算(MA、RSI、MACD 等)若涉及全量历史数据,应使用 `pandas` + `numpy` 向量化计算,而非 Python 循环: - -```python -import pandas as pd -import numpy as np - -def compute_sma_vectorized(klines: list[KlineRecord], period: int) -> np.ndarray: - """向量化计算简单移动平均 — 比纯 Python 循环快 50-100×""" - closes = np.array([k.close for k in klines], dtype=np.float64) - return pd.Series(closes).rolling(window=period).mean().to_numpy() -``` - ---- - -## 10. 风险提示 - -> ⚠️ **风险声明**:数字货币交易具有高风险,本系统仅提供技术执行工具。策略盈亏由使用者自负,开发者不做任何收益承诺。 - -### 10.1 数据相关风险 - -| 风险 | 说明 | 缓解措施 | -|------|------|---------| -| **数据缺口** | TimescaleDB 中 K 线可能因 data 模块采集中断而存在缺口 | 策略内检测 `time` 间隔,缺口处跳过或标记 | -| **延迟数据** | 网络延迟导致最新 K 线不完整 | `is_closed=False` 的 K 线不应参与信号计算 | -| **精度损失** | Python `float`(IEEE 754 double)约 15-17 位有效数字,对标 `NUMERIC(20,8)` 足够,但极值场景需使用 `Decimal` | 价格/成交量使用 `float`;累积资金计算使用 `Decimal` | - -### 10.2 策略相关风险 - -| 风险 | 说明 | 缓解措施 | -|------|------|---------| -| **过拟合** | 参数在回测区间表现优异但实盘失效 | 使用 Walk-Forward 分析,保留样本外验证集 | -| **未来信息泄露** | 回测中不当使用了当前时间点之后的数据 | `generate_signal()` 仅允许访问截止当前 K 线的历史数据 | -| **滑点与手续费** | 回测未考虑实际交易成本 | 回测引擎中配置滑点模型和手续费率 | - -### 10.3 跨模块一致性 - -| 检查项 | 验证方法 | -|--------|---------| -| Python 模型字段与 TypeORM 实体对齐 | 对照 [`data/db/entities/kline.entity.ts`](../data/db/entities/kline.entity.ts) 逐一核对 | -| env.yaml 配置项双端一致 | 对照 [`data/config/index.ts`](../data/config/index.ts) 的 `EnvConfig` 类型 | -| K 线查询 SQL 的列名与实体一致 | 运行 `__main__.py` 示例验证查询返回数据 | - ---- - -## 附录 A:依赖清单 - -```toml -# engine/pyproject.toml -[tool.poetry] -name = "trade-engine" -version = "0.1.0" -description = "数字货币量化交易系统 - Python 策略引擎" -authors = ["Trade Team"] - -[tool.poetry.dependencies] -python = "^3.10" -asyncpg = "^0.29" # 异步 PostgreSQL 驱动(TimescaleDB 兼容) -pydantic = "^2.5" # 运行时类型校验(与 TypeScript Zod 对称) -pyyaml = "^6.0" # 解析 env.yaml(与 TypeScript yaml 包对称) -loguru = "^0.7" # 结构化日志 -pandas = "^2.0" # 技术指标向量化计算 -numpy = "^1.26" # 数值计算基础 - -[tool.poetry.group.dev.dependencies] -pytest = "^8.0" -pytest-asyncio = "^0.23" -mypy = "^1.8" - -[build-system] -requires = ["poetry-core"] -build-backend = "poetry.core.masonry.api" -``` - ---- - -## 附录 B:TypeScript 实体速查 - -> 以下为 data 模块实体的关键字段摘要,供 Python 开发时快速对照。 -> 完整定义见 [`data/db/entities/kline.entity.ts`](../data/db/entities/kline.entity.ts)。 - -| 实体 | 表名 | 主键 | 备注 | -|------|------|------|------| -| `Kline` | `klines` | `(exchange, symbol, interval, time)` | TimescaleDB hypertable,复合主键 | -| `Exchange` | `exchanges` | `id` (UUID) | TypeORM 管理域,继承 CommonBaseEntity | -| `TradingPair` | `trading_pairs` | `id` (UUID) | TypeORM 管理域,外键关联 exchanges | diff --git a/engine/config/__init__.py b/engine/config/__init__.py deleted file mode 100644 index 2358193..0000000 --- a/engine/config/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -from engine.config.settings import ( # noqa: F401 - DB_DSN, - DbConfig, - EnvConfig, - LoggingConfig, - RedisConfig, - db, - logging, - print_config_summary, - redis, -) diff --git a/engine/config/settings.py b/engine/config/settings.py deleted file mode 100644 index 09131c9..0000000 --- a/engine/config/settings.py +++ /dev/null @@ -1,81 +0,0 @@ -"""中心化配置模块 —— 读取项目根目录 env.yaml,Pydantic 校验导出强类型配置对象。""" - -from pathlib import Path -from typing import Literal - -import yaml -from pydantic import BaseModel, Field - - -class DbConfig(BaseModel): - """TimescaleDB / PostgreSQL 连接参数""" - - host: str = "localhost" - port: int = Field(default=5432, ge=1, le=65535) - name: str - user: str - password: str - - -class RedisConfig(BaseModel): - """Redis 连接配置""" - - url: str = "redis://localhost:6379" - publish_enabled: bool = True - - -class LoggingConfig(BaseModel): - """日志配置""" - - level: Literal["trace", "debug", "info", "warn", "error", "fatal"] = "info" - node_env: Literal["development", "production"] = "development" - - -class EnvConfig(BaseModel): - """env.yaml 顶层结构""" - - db: DbConfig - redis: RedisConfig - logging: LoggingConfig - - -def _get_project_root() -> Path: - return Path(__file__).resolve().parent.parent.parent - - -def _load_yaml_config() -> dict: - root = _get_project_root() - yaml_path = root / "env.yaml" - - if not yaml_path.exists(): - raise FileNotFoundError( - f"[config] 无法读取配置文件: {yaml_path}\n" - f"请确保项目根目录存在 env.yaml。" - ) - - with open(yaml_path, "r", encoding="utf-8") as f: - data = yaml.safe_load(f) - - if data is None: - raise ValueError(f"[config] env.yaml 解析结果为空: {yaml_path}") - - return data - - -_raw = _load_yaml_config() -_config = EnvConfig.model_validate(_raw) - -db = _config.db -redis = _config.redis -logging = _config.logging - -DB_DSN = f"postgresql://{db.user}:{db.password}@{db.host}:{db.port}/{db.name}" - - -def print_config_summary() -> None: - """打印脱敏后的配置概要(不含密码明文)""" - print(f"[config] TimescaleDB: {db.user}@{db.host}:{db.port}/{db.name}") - print( - f"[config] Redis: {redis.url.replace('//', '//***@') if '@' in redis.url else redis.url}" - ) - print(f"[config] Logging: level={logging.level}, env={logging.node_env}") diff --git a/engine/data/__init__.py b/engine/data/__init__.py deleted file mode 100644 index 6634846..0000000 --- a/engine/data/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from engine.data.db import close_pool, get_pool # noqa: F401 -from engine.data.models import KlineRecord, Signal, TradingPairInfo # noqa: F401 -from engine.data.reader import KlineReader # noqa: F401 diff --git a/engine/data/db.py b/engine/data/db.py deleted file mode 100644 index 8816d7c..0000000 --- a/engine/data/db.py +++ /dev/null @@ -1,28 +0,0 @@ -"""asyncpg 连接池管理 —— engine 模块对 TimescaleDB 的唯一切入点。""" - -import asyncpg -from engine.config.settings import DB_DSN, db as db_config - -_pool: asyncpg.Pool | None = None - - -async def get_pool() -> asyncpg.Pool: - """获取或创建 asyncpg 连接池(懒初始化,复用)。""" - global _pool - if _pool is None: - _pool = await asyncpg.create_pool( - dsn=DB_DSN, - min_size=2, - max_size=10, - command_timeout=30, - server_settings={"default_transaction_read_only": "on"}, - ) - return _pool - - -async def close_pool() -> None: - """关闭连接池(应用退出时调用)""" - global _pool - if _pool: - await _pool.close() - _pool = None diff --git a/engine/data/models.py b/engine/data/models.py deleted file mode 100644 index 0f4714e..0000000 --- a/engine/data/models.py +++ /dev/null @@ -1,100 +0,0 @@ -""" -Python 数据模型 —— 镜像 data/db/entities/ 中的 TypeORM 实体。 - -命名约定: - - 类名与 TypeORM 实体类名一致(Kline, Exchange, TradingPair) - - 字段名使用 Python snake_case,对应 TypeORM 的 camelCase / snake_case - - 所有模型使用 Pydantic,提供运行时校验 + IDE 智能提示 -""" - -from datetime import datetime -from typing import Optional - -from pydantic import BaseModel, Field - - -class KlineRecord(BaseModel): - """ - K 线数据记录。 - - 映射关系: - TypeORM Kline.time → KlineRecord.time - TypeORM Kline.exchange → KlineRecord.exchange - TypeORM Kline.symbol → KlineRecord.symbol - TypeORM Kline.interval → KlineRecord.interval - TypeORM Kline.open → KlineRecord.open - TypeORM Kline.high → KlineRecord.high - TypeORM Kline.low → KlineRecord.low - TypeORM Kline.close → KlineRecord.close - TypeORM Kline.volume → KlineRecord.volume - TypeORM Kline.quote_volume → KlineRecord.quote_volume - TypeORM Kline.taker_buy_base_vol → KlineRecord.taker_buy_base_vol - TypeORM Kline.taker_buy_quote_vol→ KlineRecord.taker_buy_quote_vol - TypeORM Kline.trade_count → KlineRecord.trade_count - TypeORM Kline.is_closed → KlineRecord.is_closed - """ - - time: datetime - exchange: str - symbol: str - interval: str - - open: float - high: float - low: float - close: float - volume: float - - quote_volume: Optional[float] = None - taker_buy_base_vol: Optional[float] = None - taker_buy_quote_vol: Optional[float] = None - trade_count: Optional[int] = None - is_closed: bool = True - created_at: Optional[datetime] = None - updated_at: Optional[datetime] = None - - @classmethod - def from_record(cls, record) -> "KlineRecord": - return cls( - time=record["time"], - exchange=record["exchange"], - symbol=record["symbol"], - interval=record["interval"], - open=float(record["open"]), - high=float(record["high"]), - low=float(record["low"]), - close=float(record["close"]), - volume=float(record["volume"]), - quote_volume=float(record["quote_volume"]) if record["quote_volume"] is not None else None, - taker_buy_base_vol=float(record["taker_buy_base_vol"]) if record["taker_buy_base_vol"] is not None else None, - taker_buy_quote_vol=float(record["taker_buy_quote_vol"]) if record["taker_buy_quote_vol"] is not None else None, - trade_count=int(record["trade_count"]) if record["trade_count"] is not None else None, - is_closed=bool(record["is_closed"]), - created_at=record["created_at"] if "created_at" in record else None, - updated_at=record["updated_at"] if "updated_at" in record else None, - ) - - -class TradingPairInfo(BaseModel): - """交易对配置信息(轻量版,仅包含策略决策所需的字段)""" - - symbol: str - exchange: str - base_asset: str - quote_asset: str - price_precision: int - quantity_precision: int - min_qty: Optional[float] = None - min_notional: Optional[float] = None - active: bool = True - - -class Signal(BaseModel): - """交易信号""" - - symbol: str - signal_type: str = Field(..., pattern=r"^(BUY|SELL|HOLD)$") - price: Optional[float] = None - quantity: Optional[float] = None - reason: str = "" - timestamp: datetime = Field(default_factory=datetime.utcnow) diff --git a/engine/data/reader.py b/engine/data/reader.py deleted file mode 100644 index 301409d..0000000 --- a/engine/data/reader.py +++ /dev/null @@ -1,278 +0,0 @@ -""" -K 线数据读取器 —— 从 TimescaleDB 查询历史 K 线供策略分析和回测使用。 - -所有方法均为只读查询,对应 data 模块 Kline 实体的字段结构。 -参考:data/db/entities/kline.entity.ts -""" - -from datetime import datetime -from typing import Optional, Sequence - -import asyncpg - -from engine.data.db import get_pool -from engine.data.models import KlineRecord - - -class KlineReader: - """TimescaleDB K 线只读查询器""" - - def __init__(self): - self._pool: Optional[asyncpg.Pool] = None - - async def _ensure_pool(self) -> asyncpg.Pool: - if self._pool is None: - self._pool = await get_pool() - return self._pool - - async def get_klines( - self, - symbol: str, - interval: str, - start_time: datetime, - end_time: datetime, - exchange: str = "binance", - limit: int = 1000, - ) -> list[KlineRecord]: - """ - 查询指定时间范围内的 K 线数据,支持任意周期(1m / 5m / 15m / 1h / 4h / 1d 等)。 - - Args: - symbol: 交易对(如 BTCUSDT) - interval: K 线周期(1m / 5m / 15m / 30m / 1h / 4h / 1d 等) - start_time: 起始时间(含) - end_time: 结束时间(含) - exchange: 交易所标识 - limit: 最大返回条数 - """ - pool = await self._ensure_pool() - - query = """ - SELECT - time, exchange, symbol, interval, - open, high, low, close, volume, - quote_volume, taker_buy_base_vol, taker_buy_quote_vol, - trade_count, is_closed, created_at, updated_at - FROM klines - WHERE exchange = $1 - AND symbol = $2 - AND interval = $3 - AND time >= $4 - AND time <= $5 - ORDER BY time ASC - LIMIT $6 - """ - - rows: Sequence[asyncpg.Record] = await pool.fetch( - query, - exchange, - symbol, - interval, - start_time, - end_time, - limit, - ) - - return [KlineRecord.from_record(row) for row in rows] - - async def get_latest_klines( - self, - symbol: str, - interval: str, - exchange: str = "binance", - limit: int = 500, - ) -> list[KlineRecord]: - """ - 获取最近 N 根已闭合的 K 线(策略启动时快速预热)。 - - 仅查询 is_closed = TRUE 的 K 线,避免使用未闭合的不完整数据。 - """ - pool = await self._ensure_pool() - - query = """ - SELECT - time, exchange, symbol, interval, - open, high, low, close, volume, - quote_volume, taker_buy_base_vol, taker_buy_quote_vol, - trade_count, is_closed, created_at, updated_at - FROM klines - WHERE exchange = $1 - AND symbol = $2 - AND interval = $3 - AND is_closed = TRUE - ORDER BY time DESC - LIMIT $4 - """ - - rows = await pool.fetch(query, exchange, symbol, interval, limit) - - records = [KlineRecord.from_record(row) for row in rows] - records.reverse() - return records - - async def get_klines_by_count( - self, - symbol: str, - interval: str, - count: int = 100, - exchange: str = "binance", - before_time: Optional[datetime] = None, - ) -> list[KlineRecord]: - """ - 获取最新的 N 根 K 线(按时间倒序取 N 条再正序返回)。 - - 适合策略运行时获取最近 N 根 K 线做指标计算,不关心特定时间范围。 - - Args: - symbol: 交易对 - interval: K 线周期 - count: 需要获取的 K 线数量 - exchange: 交易所标识 - before_time: 可选,只获取该时间之前的 K 线 - """ - pool = await self._ensure_pool() - - if before_time: - query = """ - SELECT - time, exchange, symbol, interval, - open, high, low, close, volume, - quote_volume, taker_buy_base_vol, taker_buy_quote_vol, - trade_count, is_closed, created_at, updated_at - FROM klines - WHERE exchange = $1 - AND symbol = $2 - AND interval = $3 - AND time <= $4 - ORDER BY time DESC - LIMIT $5 - """ - rows = await pool.fetch(query, exchange, symbol, interval, before_time, count) - else: - query = """ - SELECT - time, exchange, symbol, interval, - open, high, low, close, volume, - quote_volume, taker_buy_base_vol, taker_buy_quote_vol, - trade_count, is_closed, created_at, updated_at - FROM klines - WHERE exchange = $1 - AND symbol = $2 - AND interval = $3 - ORDER BY time DESC - LIMIT $4 - """ - rows = await pool.fetch(query, exchange, symbol, interval, count) - - records = [KlineRecord.from_record(row) for row in rows] - records.reverse() - return records - - async def get_ohlcv_array( - self, - symbol: str, - interval: str, - exchange: str = "binance", - limit: int = 200, - before_time: Optional[datetime] = None, - ) -> list[tuple[datetime, float, float, float, float, float]]: - """ - 获取 OHLCV 元组数组,直接用于 pandas DataFrame 或 TA-Lib 计算。 - - 返回格式: [(timestamp, open, high, low, close, volume), ...] - 时间升序排列。 - """ - pool = await self._ensure_pool() - - if before_time: - query = """ - SELECT time, open, high, low, close, volume - FROM klines - WHERE exchange = $1 - AND symbol = $2 - AND interval = $3 - AND time <= $4 - ORDER BY time DESC - LIMIT $5 - """ - rows = await pool.fetch(query, exchange, symbol, interval, before_time, limit) - else: - query = """ - SELECT time, open, high, low, close, volume - FROM klines - WHERE exchange = $1 - AND symbol = $2 - AND interval = $3 - ORDER BY time DESC - LIMIT $4 - """ - rows = await pool.fetch(query, exchange, symbol, interval, limit) - - return [ - (row["time"], float(row["open"]), float(row["high"]), float(row["low"]), float(row["close"]), float(row["volume"])) - for row in reversed(rows) - ] - - async def get_available_symbols( - self, - exchange: str = "binance", - ) -> list[str]: - """ - 查询已激活的交易对列表。 - - 从 trading_pairs 配置表读取(仅 active=TRUE 的记录), - 而非扫描 klines 时序表,避免全表扫描。 - 对应 data/db/entities/trading-pair.entity.ts。 - """ - pool = await self._ensure_pool() - - rows = await pool.fetch( - """ - SELECT tp.symbol - FROM trading_pairs tp - JOIN exchanges e ON tp.exchange_id = e.id - WHERE e.name = $1 - AND tp.active = TRUE - ORDER BY tp.symbol - """, - exchange, - ) - - return [row["symbol"] for row in rows] - - async def get_available_intervals( - self, - symbol: str, - exchange: str = "binance", - ) -> list[str]: - """ - 查询某交易对配置的 K 线周期列表。 - - 从 trading_pairs.kline_intervals 读取(逗号分隔字符串), - 而非扫描 klines 时序表,避免全表扫描。 - 对应 data/db/entities/trading-pair.entity.ts: kline_intervals 字段。 - """ - pool = await self._ensure_pool() - - row = await pool.fetchrow( - """ - SELECT tp.kline_intervals - FROM trading_pairs tp - JOIN exchanges e ON tp.exchange_id = e.id - WHERE e.name = $1 - AND tp.symbol = $2 - AND tp.active = TRUE - """, - exchange, - symbol, - ) - - if row is None or not row["kline_intervals"]: - return [] - - # kline_intervals 格式: "1m,5m,15m,1h,4h,1d" - return [ - interval.strip() - for interval in row["kline_intervals"].split(",") - if interval.strip() - ] diff --git a/engine/env.yaml b/engine/env.yaml new file mode 120000 index 0000000..9e40e9a --- /dev/null +++ b/engine/env.yaml @@ -0,0 +1 @@ +../env.yaml \ No newline at end of file diff --git a/engine/example/__init__.py b/engine/example/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/engine/example/test_db.py b/engine/example/test_db.py deleted file mode 100644 index 2d9f80b..0000000 --- a/engine/example/test_db.py +++ /dev/null @@ -1,44 +0,0 @@ -""" -数据库模块测试示例。 - -运行方式(在项目根目录 trade/ 下): - python -m engine.example.test_db - -前提条件: - docker compose up -d - data 模块已同步过 K 线数据到 TimescaleDB -""" - -import asyncio -from datetime import datetime, timedelta, timezone - -from engine.config import print_config_summary -from engine.data import KlineRecord, KlineReader, close_pool - -async def main(): - print("=" * 60) - print(" Trade Engine — 数据库模块测试") - print("=" * 60) - - print_config_summary() - - reader = KlineReader() - - # 获取最近 100 根 BTCUSDT 5m K 线 - klines = await reader.get_klines_by_count( - symbol="BTCUSDT", - interval="1m", - count=5, - ) - print(f"\n查询到 {len(klines)} 根 K 线:") - for k in klines[:5]: - print(f" {k.time.isoformat()} | O={k.open:.2f} H={k.high:.2f} " - f"L={k.low:.2f} C={k.close:.2f} V={k.volume:.4f}") - if len(klines) > 5: - print(f" ... 剩余 {len(klines) - 5} 根") - - await close_pool() - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/engine/pyproject.toml b/engine/pyproject.toml deleted file mode 100644 index dd3fbca..0000000 --- a/engine/pyproject.toml +++ /dev/null @@ -1,14 +0,0 @@ -[project] -name = "trade-engine" -version = "0.1.0" -description = "量化交易策略引擎" -requires-python = ">=3.10" -dependencies = [ - "asyncpg>=0.29", - "pydantic>=2.5", - "pyyaml>=6.0", -] - -[tool.pyright] -venvPath = "." -venv = ".venv"