From 1c9339a4db8cc84b3d22cd55718a3035b9c72e85 Mon Sep 17 00:00:00 2001 From: Rekey Date: Mon, 8 Jun 2026 18:19:50 +0800 Subject: [PATCH] =?UTF-8?q?feat(engine):=20=E6=96=B0=E5=A2=9E=20Python=20?= =?UTF-8?q?=E7=AD=96=E7=95=A5=E5=BC=95=E6=93=8E=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - config/settings.py:Pydantic 解析 env.yaml - data/db.py:asyncpg 连接池管理 - data/reader.py:KlineReader 只读查询 TimescaleDB - data/models.py:KlineRecord 等 Pydantic 模型,镜像 TypeORM 实体 - example/test_db.py:数据库查询验证示例 - README.md:引擎架构文档 --- engine/README.md | 1152 +++++++++++++++++ engine/config/__init__.py | 11 + .../__pycache__/__init__.cpython-314.pyc | Bin 0 -> 389 bytes .../__pycache__/settings.cpython-314.pyc | Bin 0 -> 5744 bytes engine/config/settings.py | 81 ++ engine/data/__init__.py | 3 + .../data/__pycache__/__init__.cpython-314.pyc | Bin 0 -> 383 bytes engine/data/__pycache__/db.cpython-314.pyc | Bin 0 -> 1782 bytes .../data/__pycache__/models.cpython-314.pyc | Bin 0 -> 6083 bytes .../data/__pycache__/reader.cpython-314.pyc | Bin 0 -> 12299 bytes engine/data/db.py | 28 + engine/data/models.py | 100 ++ engine/data/reader.py | 278 ++++ engine/example/__init__.py | 0 .../__pycache__/__init__.cpython-314.pyc | Bin 0 -> 159 bytes .../__pycache__/test_db.cpython-314.pyc | Bin 0 -> 2148 bytes engine/example/test_db.py | 44 + engine/pyproject.toml | 14 + 18 files changed, 1711 insertions(+) create mode 100644 engine/README.md create mode 100644 engine/config/__init__.py create mode 100644 engine/config/__pycache__/__init__.cpython-314.pyc create mode 100644 engine/config/__pycache__/settings.cpython-314.pyc create mode 100644 engine/config/settings.py create mode 100644 engine/data/__init__.py create mode 100644 engine/data/__pycache__/__init__.cpython-314.pyc create mode 100644 engine/data/__pycache__/db.cpython-314.pyc create mode 100644 engine/data/__pycache__/models.cpython-314.pyc create mode 100644 engine/data/__pycache__/reader.cpython-314.pyc create mode 100644 engine/data/db.py create mode 100644 engine/data/models.py create mode 100644 engine/data/reader.py create mode 100644 engine/example/__init__.py create mode 100644 engine/example/__pycache__/__init__.cpython-314.pyc create mode 100644 engine/example/__pycache__/test_db.cpython-314.pyc create mode 100644 engine/example/test_db.py create mode 100644 engine/pyproject.toml diff --git a/engine/README.md b/engine/README.md new file mode 100644 index 0000000..a219ba9 --- /dev/null +++ b/engine/README.md @@ -0,0 +1,1152 @@ +# Engine Module — Python 策略引擎架构文档 + +> **模块定位**:数字货币量化交易系统的业务层,负责策略开发、回测分析、信号生成,从 TimescaleDB 读取 data 模块持久化的 K 线数据进行策略决策。 +> +> **运行时**:Python 3.10+ | **依赖管理**:Poetry / uv | **数据库驱动**:asyncpg | **配置解析**:PyYAML + +--- + +## 目录 + +1. [模块定位与架构边界](#1-模块定位与架构边界) +2. [目录结构](#2-目录结构) +3. [配置管理 — 读取 env.yaml](#3-配置管理--读取-envyaml) +4. [数据读取 — TimescaleDB K 线查询](#4-数据读取--timescaledb-k-线查询) +5. [实体映射 — Python ↔ TypeORM 类型对齐](#5-实体映射--python--typeorm-类型对齐) +6. [策略基类设计](#6-策略基类设计) +7. [回测引擎设计](#7-回测引擎设计) +8. [最小可运行示例](#8-最小可运行示例) +9. [性能考量](#9-性能考量) +10. [风险提示](#10-风险提示) + +--- + +## 1. 模块定位与架构边界 + +### 1.1 系统分层中的位置 + +``` +┌──────────────────────────────────────────────────────────────┐ +│ 🐍 Python Engine (本模块) │ +│ │ +│ ┌──────────────┐ ┌──────────────┐ ┌───────────────────┐ │ +│ │ 策略引擎 │ │ 回测引擎 │ │ 信号分发器 │ │ +│ │ (strategy/) │ │ (backtest/) │ │ (signals/) │ │ +│ └──────┬───────┘ └──────┬───────┘ └────────┬──────────┘ │ +│ │ │ │ │ +│ └─────────────────┼────────────────────┘ │ +│ │ │ +│ ┌────────────────────────▼──────────────────────────────┐ │ +│ │ 数据访问层 (data/) │ │ +│ │ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ +│ │ │ TimescaleDB │ │ env.yaml │ │ Redis │ │ │ +│ │ │ Reader │ │ Config │ │ Subscriber │ │ │ +│ │ └─────────────┘ └──────────────┘ └──────────────┘ │ │ +│ └───────────────────────────────────────────────────────┘ │ +└──────────────────────────┬───────────────────────────────────┘ + │ + ┌──────────────────┼──────────────────┐ + │ │ │ + ▼ ▼ ▼ +┌──────────────┐ ┌──────────────┐ ┌──────────────┐ +│ TimescaleDB │ │ env.yaml │ │ Redis │ +│ (data 模块 │ │ (项目根目录) │ │ (行情消息) │ +│ 写入的K线) │ │ │ │ │ +└──────────────┘ └──────────────┘ └──────────────┘ +``` + +### 1.2 与 data 模块的关系 + +| 维度 | data 模块 (TypeScript) | engine 模块 (Python) | +|------|----------------------|---------------------| +| **职责** | 行情采集、K 线合成、数据持久化 | 策略计算、回测、信号生成 | +| **数据库操作** | **写入**(UPSERT K 线到 TimescaleDB) | **只读**(SELECT K 线进行策略分析) | +| **配置来源** | `data/config/index.ts` 读取 `env.yaml` | `engine/config.py` 读取同一份 `env.yaml` | +| **实体定义** | TypeORM 装饰器实体 (`data/db/entities/`) | Pydantic/dataclass 模型(镜像 TypeORM 结构) | +| **运行时** | Bun / Node.js | CPython 3.10+ | + +> **设计原则**:engine 模块对 TimescaleDB **只读**,绝不写入。K 线数据的写入由 data 模块全权负责,避免双端写入导致的数据一致性问题。 + +--- + +## 2. 目录结构 + +``` +engine/ +├── README.md # 本架构文档 +├── pyproject.toml # Python 项目依赖(Poetry) +├── __init__.py +│ +├── config/ # 配置管理 +│ ├── __init__.py +│ └── settings.py # 读取 ../../env.yaml,Pydantic 校验 +│ +├── data/ # 数据访问层(只读 TimescaleDB) +│ ├── __init__.py +│ ├── db.py # asyncpg 连接池管理 +│ ├── reader.py # K 线查询方法封装 +│ └── models.py # Python 数据模型(镜像 TypeORM 实体) +│ +├── strategy/ # 策略实现 +│ ├── __init__.py +│ ├── base.py # BaseStrategy 抽象基类 +│ ├── ma_cross.py # 双均线交叉策略示例 +│ └── grid.py # 网格交易策略示例 +│ +├── backtest/ # 回测引擎 +│ ├── __init__.py +│ ├── engine.py # 事件驱动回测引擎 +│ └── metrics.py # 绩效指标计算(夏普比率、最大回撤等) +│ +├── signals/ # 信号分发 +│ ├── __init__.py +│ └── dispatcher.py # 信号生成与分发 +│ +├── common/ # 公共工具 +│ ├── __init__.py +│ ├── logger.py # loguru 日志配置 +│ └── utils.py # 时间工具、重试装饰器等 +│ +└── tests/ # 单元测试 + ├── __init__.py + ├── test_config.py + ├── test_reader.py + └── test_strategy.py +``` + +--- + +## 3. 配置管理 — 读取 env.yaml + +### 3.1 设计思路 + +engine 模块与 data 模块共享项目根目录的 `env.yaml`。Python 侧通过 `PyYAML` 解析,`Pydantic` 进行运行时校验——这与 TypeScript 侧 `data/config/index.ts` 使用 `yaml` + `zod` 的模式完全对称。 + +### 3.2 配置文件结构回顾 + +`env.yaml`(项目根目录,data 与 engine 共享): + +```yaml +# --- TimescaleDB / PostgreSQL 连接 --- +db: + host: localhost + port: 5432 + name: trade + user: trader + password: fucketh + +# --- Redis 连接 --- +redis: + url: redis://localhost:6379 + publish_enabled: true + +# --- 日志 --- +logging: + level: debug + node_env: development +``` + +### 3.3 Python 实现 + +```python +# engine/config/settings.py +"""中心化配置模块 —— 读取项目根目录 env.yaml,Pydantic 校验导出强类型配置对象。""" + +from pathlib import Path +from typing import Literal + +import yaml +from pydantic import BaseModel, Field, field_validator + + +# ============================================================ +# 1. Pydantic 校验模型(镜像 TypeScript config/validators.ts) +# ============================================================ + +class DbConfig(BaseModel): + """TimescaleDB / PostgreSQL 连接参数""" + host: str = "localhost" + port: int = Field(default=5432, ge=1, le=65535) + name: str # 对应 YAML 中的 name + user: str + password: str + + +class RedisConfig(BaseModel): + """Redis 连接配置""" + url: str = "redis://localhost:6379" + publish_enabled: bool = True + + +class LoggingConfig(BaseModel): + """日志配置""" + level: Literal["trace", "debug", "info", "warn", "error", "fatal"] = "info" + node_env: Literal["development", "production"] = "development" + + +class EnvConfig(BaseModel): + """env.yaml 顶层结构""" + db: DbConfig + redis: RedisConfig + logging: LoggingConfig + + +# ============================================================ +# 2. 定位并加载 env.yaml +# ============================================================ + +def _get_project_root() -> Path: + """ + 计算项目根目录的绝对路径。 + engine/config/settings.py → engine/config/ → engine/ → / + """ + return Path(__file__).resolve().parent.parent.parent + + +def _load_yaml_config() -> dict: + """读取项目根目录 env.yaml,返回原始字典。文件不存在时抛出明确错误。""" + root = _get_project_root() + yaml_path = root / "env.yaml" + + if not yaml_path.exists(): + raise FileNotFoundError( + f"[config] 无法读取配置文件: {yaml_path}\n" + f"请确保项目根目录存在 env.yaml。" + ) + + with open(yaml_path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + + if data is None: + raise ValueError(f"[config] env.yaml 解析结果为空: {yaml_path}") + + return data + + +# ============================================================ +# 3. 加载 & 校验 & 导出 +# ============================================================ + +_raw = _load_yaml_config() +_config = EnvConfig.model_validate(_raw) + +# --- 按职责分组的导出配置对象 --- + +# TimescaleDB 连接参数(可直接用于 asyncpg.create_pool) +db = _config.db + +# Redis 连接参数 +redis = _config.redis + +# 日志参数 +logging = _config.logging + +# --- asyncpg DSN(便捷构造)--- +DB_DSN = ( + f"postgresql://{db.user}:{db.password}@{db.host}:{db.port}/{db.name}" +) + + +def print_config_summary() -> None: + """打印脱敏后的配置概要(不含密码明文)""" + print(f"[config] TimescaleDB: {db.user}@{db.host}:{db.port}/{db.name}") + print(f"[config] Redis: {redis.url.replace('//', '//***@') if '@' in redis.url else redis.url}") + print(f"[config] Logging: level={logging.level}, env={logging.node_env}") +``` + +### 3.4 与 TypeScript 侧对比 + +| 步骤 | TypeScript (`data/config/index.ts`) | Python (`engine/config/settings.py`) | +|------|--------------------------------------|--------------------------------------| +| **路径计算** | `fileURLToPath(import.meta.url)` → `../..` | `Path(__file__).resolve().parent.parent.parent` | +| **YAML 解析** | `parse()` from `yaml` npm 包 | `yaml.safe_load()` from PyYAML | +| **类型校验** | `validateConfig()` 使用 `zod` | `EnvConfig.model_validate()` 使用 Pydantic | +| **导出方式** | 分组 `const` 对象 (`pgsql`, `redis`, `logging`) | 分组模块级变量 (`db`, `redis`, `logging`) | +| **DSN 构造** | 不构造,由 TypeORM DataSource 使用拆解字段 | 额外导出 `DB_DSN` 供 asyncpg 使用 | + +--- + +## 4. 数据读取 — TimescaleDB K 线查询 + +### 4.1 连接池管理 + +```python +# engine/data/db.py +"""asyncpg 连接池管理 —— engine 模块对 TimescaleDB 的唯一切入点。""" + +import asyncpg +from engine.config.settings import db as db_config, DB_DSN + +# 模块级连接池(单例) +_pool: asyncpg.Pool | None = None + + +async def get_pool() -> asyncpg.Pool: + """获取或创建 asyncpg 连接池(懒初始化,复用)。""" + global _pool + if _pool is None: + _pool = await asyncpg.create_pool( + dsn=DB_DSN, + min_size=2, + max_size=10, + command_timeout=30, # 查询超时 30 秒 + # 只读模式:在连接级别设置,防止误写 + server_settings={"default_transaction_read_only": "on"}, + ) + return _pool + + +async def close_pool() -> None: + """关闭连接池(应用退出时调用)""" + global _pool + if _pool: + await _pool.close() + _pool = None +``` + +### 4.2 K 线读取器 + +```python +# engine/data/reader.py +""" +K 线数据读取器 —— 从 TimescaleDB 查询历史 K 线供策略分析和回测使用。 + +所有方法均为只读查询,对应 data 模块 Kline 实体的字段结构。 +参考:data/db/entities/kline.entity.ts +""" + +from datetime import datetime +from typing import Optional, Sequence + +import asyncpg + +from engine.data.db import get_pool +from engine.data.models import KlineRecord + + +class KlineReader: + """TimescaleDB K 线只读查询器""" + + def __init__(self): + self._pool: Optional[asyncpg.Pool] = None + + async def _ensure_pool(self) -> asyncpg.Pool: + if self._pool is None: + self._pool = await get_pool() + return self._pool + + # ---------------------------------------------------------- + # 核心查询方法 + # ---------------------------------------------------------- + + async def get_klines( + self, + symbol: str, + interval: str, + start_time: datetime, + end_time: datetime, + exchange: str = "binance", + limit: int = 1000, + ) -> list[KlineRecord]: + """ + 查询指定时间范围内的 K 线数据。 + + 字段映射 — data/db/entities/kline.entity.ts: + time → Kline.time (timestamptz) + exchange → Kline.exchange (text) + symbol → Kline.symbol (text) + interval → Kline.interval (text) + open → Kline.open (numeric) + high → Kline.high (numeric) + low → Kline.low (numeric) + close → Kline.close (numeric) + volume → Kline.volume (numeric) + quote_volume → Kline.quote_volume (nullable) + trade_count → Kline.trade_count (nullable) + is_closed → Kline.is_closed (boolean) + """ + pool = await self._ensure_pool() + + query = """ + SELECT + time, + exchange, + symbol, + interval, + open, + high, + low, + close, + volume, + quote_volume, + taker_buy_base_vol, + taker_buy_quote_vol, + trade_count, + is_closed + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + AND time >= $4 + AND time <= $5 + ORDER BY time ASC + LIMIT $6 + """ + + rows: Sequence[asyncpg.Record] = await pool.fetch( + query, + exchange, + symbol, + interval, + start_time, + end_time, + limit, + ) + + return [KlineRecord.from_record(row) for row in rows] + + async def get_latest_klines( + self, + symbol: str, + interval: str, + exchange: str = "binance", + limit: int = 500, + ) -> list[KlineRecord]: + """ + 获取最近 N 根已闭合的 K 线(策略启动时快速预热)。 + + 仅查询 is_closed = TRUE 的 K 线,避免使用未闭合的不完整数据。 + """ + pool = await self._ensure_pool() + + query = """ + SELECT + time, exchange, symbol, interval, + open, high, low, close, volume, + quote_volume, taker_buy_base_vol, taker_buy_quote_vol, + trade_count, is_closed + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + AND is_closed = TRUE + ORDER BY time DESC + LIMIT $4 + """ + + rows = await pool.fetch(query, exchange, symbol, interval, limit) + + # 正序返回(时间升序) + records = [KlineRecord.from_record(row) for row in rows] + records.reverse() + return records + + async def get_ohlcv_array( + self, + symbol: str, + interval: str, + exchange: str = "binance", + limit: int = 200, + ) -> list[tuple[datetime, float, float, float, float, float]]: + """ + 获取 OHLCV 元组数组,直接用于 pandas DataFrame 或 TA-Lib 计算。 + + 返回格式: [(timestamp, open, high, low, close, volume), ...] + 时间升序排列。 + """ + pool = await self._ensure_pool() + + query = """ + SELECT time, open, high, low, close, volume + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + ORDER BY time DESC + LIMIT $4 + """ + + rows = await pool.fetch(query, exchange, symbol, interval, limit) + + return [ + (row["time"], row["open"], row["high"], row["low"], row["close"], row["volume"]) + for row in reversed(rows) + ] + + async def get_available_symbols( + self, + exchange: str = "binance", + ) -> list[str]: + """查询 TimescaleDB 中有 K 线数据的交易对列表。""" + pool = await self._ensure_pool() + + rows = await pool.fetch( + """ + SELECT DISTINCT symbol + FROM klines + WHERE exchange = $1 + ORDER BY symbol + """, + exchange, + ) + + return [row["symbol"] for row in rows] +``` + +### 4.3 查询模式与索引对齐 + +Kline 实体的复合主键 `(exchange, symbol, interval, time)` 决定了最常用的查询模式。以下查询均能命中主键索引: + +| 查询场景 | SQL 条件 | 索引命中 | +|---------|---------|---------| +| 回测:某交易对某周期的时间范围 | `WHERE exchange=$1 AND symbol=$2 AND interval=$3 AND time>=$4 AND time<=$5 ORDER BY time` | ✅ 主键索引前导列精确匹配 | +| 最新 K 线:已闭合的最近 N 根 | `WHERE exchange=$1 AND symbol=$2 AND interval=$3 AND is_closed=TRUE ORDER BY time DESC LIMIT N` | ✅ 主键索引 + 顺序扫描(LIMIT 小) | +| 可用交易对枚举 | `SELECT DISTINCT symbol WHERE exchange=$1` | ⚠️ 全表扫描 — 建议从 `trading_pairs` 表获取 | + +> **性能建议**:`get_available_symbols()` 应优先查询 `trading_pairs` 表(关系数据,TypeORM 管理域),而非扫描 `klines` 时序表。 + +--- + +## 5. 实体映射 — Python ↔ TypeORM 类型对齐 + +### 5.1 映射策略 + +Python 侧的数据模型镜像 TypeScript 侧 TypeORM 实体,保证双端数据结构一致性。映射规则: + +``` +TypeScript (TypeORM Entity) → Python (Pydantic Model) +════════════════════════════ ════════════════════════ +@PrimaryColumn() timestamptz → datetime +@Column("text") → str +@Column("numeric", 20, 8) → float (Python float = C double, 足够 20 位精度) +@Column("boolean") → bool +@Column("integer") → int +@Column({ nullable: true }) → Optional[...] +``` + +### 5.2 Python 数据模型 + +```python +# engine/data/models.py +""" +Python 数据模型 —— 镜像 data/db/entities/ 中的 TypeORM 实体。 + +命名约定: + - 类名与 TypeORM 实体类名一致(Kline, Exchange, TradingPair) + - 字段名使用 Python snake_case,对应 TypeORM 的 camelCase / snake_case + - 所有模型使用 Pydantic,提供运行时校验 + IDE 智能提示 +""" + +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel, Field + + +# ============================================================ +# Kline 模型 — 镜像 data/db/entities/kline.entity.ts +# ============================================================ + +class KlineRecord(BaseModel): + """ + K 线数据记录。 + + 映射关系: + TypeORM Kline.time → KlineRecord.time + TypeORM Kline.exchange → KlineRecord.exchange + TypeORM Kline.symbol → KlineRecord.symbol + TypeORM Kline.interval → KlineRecord.interval + TypeORM Kline.open → KlineRecord.open + TypeORM Kline.high → KlineRecord.high + TypeORM Kline.low → KlineRecord.low + TypeORM Kline.close → KlineRecord.close + TypeORM Kline.volume → KlineRecord.volume + TypeORM Kline.quote_volume → KlineRecord.quote_volume + TypeORM Kline.taker_buy_base_vol → KlineRecord.taker_buy_base_vol + TypeORM Kline.taker_buy_quote_vol→ KlineRecord.taker_buy_quote_vol + TypeORM Kline.trade_count → KlineRecord.trade_count + TypeORM Kline.is_closed → KlineRecord.is_closed + """ + + time: datetime + exchange: str + symbol: str + interval: str + + # OHLCV + open: float + high: float + low: float + close: float + volume: float + + # 扩展字段(nullable → Optional) + quote_volume: Optional[float] = None + taker_buy_base_vol: Optional[float] = None + taker_buy_quote_vol: Optional[float] = None + trade_count: Optional[int] = None + + # 状态 + is_closed: bool = True + + @classmethod + def from_record(cls, record) -> "KlineRecord": + """ + 从 asyncpg.Record 构造 KlineRecord 实例。 + + asyncpg 返回的 Record 对象行为类似 dict, + 数值列自动转为 Python float/int,timestamptz 自动转为 datetime。 + """ + return cls( + time=record["time"], + exchange=record["exchange"], + symbol=record["symbol"], + interval=record["interval"], + open=float(record["open"]), + high=float(record["high"]), + low=float(record["low"]), + close=float(record["close"]), + volume=float(record["volume"]), + quote_volume=float(record["quote_volume"]) if record["quote_volume"] is not None else None, + taker_buy_base_vol=float(record["taker_buy_base_vol"]) if record["taker_buy_base_vol"] is not None else None, + taker_buy_quote_vol=float(record["taker_buy_quote_vol"]) if record["taker_buy_quote_vol"] is not None else None, + trade_count=int(record["trade_count"]) if record["trade_count"] is not None else None, + is_closed=bool(record["is_closed"]), + ) + + +# ============================================================ +# TradingPair 模型 — 镜像 data/db/entities/trading-pair.entity.ts +# ============================================================ + +class TradingPairInfo(BaseModel): + """ + 交易对配置信息(轻量版,仅包含策略决策所需的字段)。 + + 完整实体参考 data/db/entities/trading-pair.entity.ts + """ + symbol: str + exchange: str + base_asset: str + quote_asset: str + price_precision: int + quantity_precision: int + min_qty: Optional[float] = None + min_notional: Optional[float] = None + active: bool = True + + +# ============================================================ +# 策略信号模型 +# ============================================================ + +class Signal(BaseModel): + """ + 交易信号。 + + 策略 generate_signal() 方法的返回值, + 由 BaseStrategy.do_action() 消费执行。 + """ + symbol: str + signal_type: str = Field(..., pattern=r"^(BUY|SELL|HOLD)$") + price: Optional[float] = None # 限价单价格(None = 市价单) + quantity: Optional[float] = None # 下单数量(None = 使用风控模块计算的默认值) + reason: str = "" # 信号产生原因(便于日志审计) + timestamp: datetime = Field(default_factory=datetime.utcnow) +``` + +### 5.3 字段对应速查表 + +| TypeORM Entity (`data/db/entities/kline.entity.ts`) | Python Model (`engine/data/models.py`) | DB 列类型 | Python 类型 | +|-----------------------------------------------------|----------------------------------------|----------|------------| +| `time` (PrimaryColumn, timestamptz) | `KlineRecord.time` | `TIMESTAMPTZ` | `datetime` | +| `exchange` (PrimaryColumn, text) | `KlineRecord.exchange` | `TEXT` | `str` | +| `symbol` (PrimaryColumn, text) | `KlineRecord.symbol` | `TEXT` | `str` | +| `interval` (PrimaryColumn, text) | `KlineRecord.interval` | `TEXT` | `str` | +| `open` (numeric 20,8) | `KlineRecord.open` | `NUMERIC(20,8)` | `float` | +| `high` (numeric 20,8) | `KlineRecord.high` | `NUMERIC(20,8)` | `float` | +| `low` (numeric 20,8) | `KlineRecord.low` | `NUMERIC(20,8)` | `float` | +| `close` (numeric 20,8) | `KlineRecord.close` | `NUMERIC(20,8)` | `float` | +| `volume` (numeric 20,8) | `KlineRecord.volume` | `NUMERIC(20,8)` | `float` | +| `quote_volume` (nullable) | `KlineRecord.quote_volume` | `NUMERIC(20,8)` | `Optional[float]` | +| `taker_buy_base_vol` (nullable) | `KlineRecord.taker_buy_base_vol` | `NUMERIC(20,8)` | `Optional[float]` | +| `taker_buy_quote_vol` (nullable) | `KlineRecord.taker_buy_quote_vol` | `NUMERIC(20,8)` | `Optional[float]` | +| `trade_count` (nullable) | `KlineRecord.trade_count` | `INTEGER` | `Optional[int]` | +| `is_closed` | `KlineRecord.is_closed` | `BOOLEAN` | `bool` | + +--- + +## 6. 策略基类设计 + +```python +# engine/strategy/base.py +"""策略抽象基类 —— 所有交易策略的父类。""" + +from abc import ABC, abstractmethod +from datetime import datetime +from typing import Optional + +from engine.data.models import KlineRecord, Signal + + +class BaseStrategy(ABC): + """ + 策略基类。 + + 子类必须实现: + - generate_signal(): 基于 K 线数据生成交易信号 + - on_kline(): K 线到达时的回调(可在此更新内部状态) + + 生命周期: + 1. __init__() — 初始化策略参数和内状态 + 2. on_kline() — 每根新 K 线到达时调用(更新内部指标状态) + 3. generate_signal() — 被回测引擎/实盘调度器调用,生成买卖信号 + """ + + def __init__(self, symbol: str, interval: str, **kwargs): + self.symbol = symbol + self.interval = interval + self._klines: list[KlineRecord] = [] # 内部 K 线缓存 + self._last_signal: Optional[Signal] = None + + @abstractmethod + async def generate_signal(self) -> Optional[Signal]: + """ + 基于当前缓存的 K 线数据生成交易信号。 + + Returns: + Signal(BUY/SELL) — 触发交易 + None — 无信号,继续持仓/观望 + """ + ... + + async def on_kline(self, kline: KlineRecord) -> None: + """ + 新 K 线到达回调(默认实现:追加到缓存)。 + + 子类可重写以更新技术指标缓存(如 MA、RSI 中间计算结果)。 + """ + self._klines.append(kline) + + async def warm_up(self, klines: list[KlineRecord]) -> None: + """ + 策略预热:加载历史 K 线,初始化内部状态。 + + 回测引擎在开始回测前调用;实盘调度器在策略启动时调用。 + """ + self._klines = sorted(klines, key=lambda k: k.time) + + @property + def is_ready(self) -> bool: + """策略是否已就绪(缓存了足够的历史 K 线)""" + return len(self._klines) >= self.min_klines_required + + @property + @abstractmethod + def min_klines_required(self) -> int: + """策略正常运行所需的最少 K 线数量(如双均线策略需要 slow_period+1 根)""" + ... +``` + +--- + +## 7. 回测引擎设计 + +```python +# engine/backtest/engine.py +""" +事件驱动回测引擎。 + +核心流程: + 1. 从 TimescaleDB 加载历史 K 线 + 2. 按时间顺序逐根喂给策略 + 3. 收集信号 → 模拟成交 → 计算收益 + 4. 输出绩效报告(夏普比率、最大回撤、胜率等) +""" + +from dataclasses import dataclass, field +from datetime import datetime + +from engine.data.reader import KlineReader +from engine.data.models import KlineRecord, Signal +from engine.strategy.base import BaseStrategy + + +@dataclass +class BacktestResult: + """回测结果""" + symbol: str + interval: str + start_time: datetime + end_time: datetime + total_klines: int + total_signals: int + total_return: float # 总收益率 + annual_return: float # 年化收益率 + max_drawdown: float # 最大回撤 + sharpe_ratio: float # 夏普比率 + win_rate: float # 胜率 + profit_factor: float # 盈亏比 + equity_curve: list[float] = field(default_factory=list) # 权益曲线 + + +class BacktestEngine: + """回测引擎""" + + def __init__(self, reader: KlineReader): + self.reader = reader + + async def run( + self, + strategy: BaseStrategy, + symbol: str, + interval: str, + start_time: datetime, + end_time: datetime, + exchange: str = "binance", + ) -> BacktestResult: + """ + 执行回测。 + + Args: + strategy: 待回测的策略实例 + symbol: 交易对 + interval: K 线周期 + start_time: 回测起始时间 + end_time: 回测结束时间 + exchange: 交易所 + """ + # 1. 加载历史 K 线 + klines = await self.reader.get_klines( + symbol=symbol, + interval=interval, + start_time=start_time, + end_time=end_time, + exchange=exchange, + ) + + if len(klines) == 0: + raise ValueError(f"回测区间内无 K 线数据: {symbol} {interval} [{start_time}, {end_time}]") + + # 2. 策略预热 + await strategy.warm_up(klines) + + # 3. 逐根 K 线回放 + signals: list[Signal] = [] + for kline in klines: + await strategy.on_kline(kline) + if strategy.is_ready: + signal = await strategy.generate_signal() + if signal is not None: + signals.append(signal) + + # 4. 计算绩效指标 + return self._compute_metrics( + symbol=symbol, + interval=interval, + start_time=start_time, + end_time=end_time, + klines=klines, + signals=signals, + ) + + def _compute_metrics( + self, + symbol: str, + interval: str, + start_time: datetime, + end_time: datetime, + klines: list[KlineRecord], + signals: list[Signal], + ) -> BacktestResult: + """计算回测绩效指标(框架方法,实际计算委托 metrics.py)""" + from engine.backtest.metrics import compute_metrics + + return compute_metrics( + symbol=symbol, + interval=interval, + start_time=start_time, + end_time=end_time, + klines=klines, + signals=signals, + ) +``` + +--- + +## 8. 最小可运行示例 + +### 8.1 安装依赖 + +```bash +# engine/pyproject.toml +[tool.poetry.dependencies] +python = "^3.10" +asyncpg = "^0.29" +pydantic = "^2.5" +pyyaml = "^6.0" +loguru = "^0.7" +pandas = "^2.0" +numpy = "^1.26" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" +``` + +```bash +cd engine +poetry install +``` + +### 8.2 运行示例 + +```python +# engine/__main__.py +""" +最小可运行示例 —— 验证 TimescaleDB 连接、配置读取、K 线查询。 + +运行方式: + cd engine + poetry run python -m engine +""" + +import asyncio +from datetime import datetime, timedelta, timezone + +from engine.config.settings import print_config_summary, db as db_config +from engine.data.db import get_pool, close_pool +from engine.data.reader import KlineReader + + +async def main(): + # 1. 打印配置概要(验证 env.yaml 读取) + print_config_summary() + + # 2. 测试 TimescaleDB 连接 + pool = await get_pool() + async with pool.acquire() as conn: + version = await conn.fetchval("SELECT version()") + print(f"[db] TimescaleDB 连接成功: {version}") + + # 3. 查询最近 10 根 BTCUSDT 1h K 线 + reader = KlineReader() + end = datetime.now(timezone.utc) + start = end - timedelta(hours=24) + + klines = await reader.get_klines( + symbol="BTCUSDT", + interval="1h", + start_time=start, + end_time=end, + exchange="binance", + limit=10, + ) + + print(f"\n[data] 查询到 {len(klines)} 根 K 线:") + for k in klines[:5]: + print(f" {k.time.isoformat()} | O={k.open:.2f} H={k.high:.2f} " + f"L={k.low:.2f} C={k.close:.2f} V={k.volume:.4f}") + + # 4. 清理 + await close_pool() + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +### 8.3 完整启动流程 + +```python +# engine/run.py +"""策略引擎完整启动流程示例""" + +import asyncio +from datetime import datetime, timedelta, timezone + +from engine.config.settings import print_config_summary +from engine.data.db import get_pool, close_pool +from engine.data.reader import KlineReader +from engine.strategy.ma_cross import MACrossStrategy # 双均线策略示例 +from engine.backtest.engine import BacktestEngine + + +async def run_backtest(): + print_config_summary() + + reader = KlineReader() + engine = BacktestEngine(reader) + + # 策略参数 + strategy = MACrossStrategy( + symbol="BTCUSDT", + interval="1h", + fast_period=5, # 快线周期 + slow_period=20, # 慢线周期 + ) + + # 回测区间:最近 90 天 + end = datetime.now(timezone.utc) + start = end - timedelta(days=90) + + print(f"\n[backtest] 开始回测: {strategy.symbol} {strategy.interval} " + f"[{start.date()} ~ {end.date()}]") + + result = await engine.run( + strategy=strategy, + symbol="BTCUSDT", + interval="1h", + start_time=start, + end_time=end, + ) + + # 输出绩效 + print(f"\n{'='*50}") + print(f"回测绩效报告") + print(f"{'='*50}") + print(f" K 线总数: {result.total_klines}") + print(f" 交易信号数: {result.total_signals}") + print(f" 总收益率: {result.total_return:.2%}") + print(f" 年化收益率: {result.annual_return:.2%}") + print(f" 最大回撤: {result.max_drawdown:.2%}") + print(f" 夏普比率: {result.sharpe_ratio:.2f}") + print(f" 胜率: {result.win_rate:.2%}") + print(f" 盈亏比: {result.profit_factor:.2f}") + print(f"{'='*50}") + + await close_pool() + + +if __name__ == "__main__": + asyncio.run(run_backtest()) +``` + +--- + +## 9. 性能考量 + +### 9.1 查询优化 + +| 场景 | 优化策略 | 效果 | +|------|---------|------| +| **大数据量回测**(> 10 万根 K 线) | 使用 `asyncpg.Cursor` 流式读取,避免一次性加载到内存 | 内存峰值从 O(n) 降到 O(batch_size) | +| **多币种并发查询** | `asyncio.gather()` 并行查询多个 symbol | N 个币种查询时间 ≈ max(单币种时间),而非 sum | +| **重复区间查询** | 策略层缓存已查询的 K 线,避免重复 DB 查询 | 减少 90% 的重复 I/O | +| **高周期 K 线** | 若 TimescaleDB 配置了连续聚合视图,直接查询物化视图而非原始 1m 表 | 查询速度提升 10-100× | + +### 9.2 内存管理 + +```python +# 流式读取大数据量 K 线(避免 OOM) +async def get_klines_streaming( + pool: asyncpg.Pool, + symbol: str, + interval: str, + start_time: datetime, + end_time: datetime, + batch_size: int = 5000, +): + """流式读取 K 线,每次返回 batch_size 条,适合百万级回测。""" + async with pool.acquire() as conn: + async with conn.transaction(): + cursor = await conn.cursor( + """ + SELECT * FROM klines + WHERE exchange=$1 AND symbol=$2 AND interval=$3 + AND time>=$4 AND time<=$5 + ORDER BY time ASC + """ + ) + # 省略参数绑定细节... + async for row in cursor: + yield KlineRecord.from_record(row) +``` + +### 9.3 向量化计算建议 + +策略中的技术指标计算(MA、RSI、MACD 等)若涉及全量历史数据,应使用 `pandas` + `numpy` 向量化计算,而非 Python 循环: + +```python +import pandas as pd +import numpy as np + +def compute_sma_vectorized(klines: list[KlineRecord], period: int) -> np.ndarray: + """向量化计算简单移动平均 — 比纯 Python 循环快 50-100×""" + closes = np.array([k.close for k in klines], dtype=np.float64) + return pd.Series(closes).rolling(window=period).mean().to_numpy() +``` + +--- + +## 10. 风险提示 + +> ⚠️ **风险声明**:数字货币交易具有高风险,本系统仅提供技术执行工具。策略盈亏由使用者自负,开发者不做任何收益承诺。 + +### 10.1 数据相关风险 + +| 风险 | 说明 | 缓解措施 | +|------|------|---------| +| **数据缺口** | TimescaleDB 中 K 线可能因 data 模块采集中断而存在缺口 | 策略内检测 `time` 间隔,缺口处跳过或标记 | +| **延迟数据** | 网络延迟导致最新 K 线不完整 | `is_closed=False` 的 K 线不应参与信号计算 | +| **精度损失** | Python `float`(IEEE 754 double)约 15-17 位有效数字,对标 `NUMERIC(20,8)` 足够,但极值场景需使用 `Decimal` | 价格/成交量使用 `float`;累积资金计算使用 `Decimal` | + +### 10.2 策略相关风险 + +| 风险 | 说明 | 缓解措施 | +|------|------|---------| +| **过拟合** | 参数在回测区间表现优异但实盘失效 | 使用 Walk-Forward 分析,保留样本外验证集 | +| **未来信息泄露** | 回测中不当使用了当前时间点之后的数据 | `generate_signal()` 仅允许访问截止当前 K 线的历史数据 | +| **滑点与手续费** | 回测未考虑实际交易成本 | 回测引擎中配置滑点模型和手续费率 | + +### 10.3 跨模块一致性 + +| 检查项 | 验证方法 | +|--------|---------| +| Python 模型字段与 TypeORM 实体对齐 | 对照 [`data/db/entities/kline.entity.ts`](../data/db/entities/kline.entity.ts) 逐一核对 | +| env.yaml 配置项双端一致 | 对照 [`data/config/index.ts`](../data/config/index.ts) 的 `EnvConfig` 类型 | +| K 线查询 SQL 的列名与实体一致 | 运行 `__main__.py` 示例验证查询返回数据 | + +--- + +## 附录 A:依赖清单 + +```toml +# engine/pyproject.toml +[tool.poetry] +name = "trade-engine" +version = "0.1.0" +description = "数字货币量化交易系统 - Python 策略引擎" +authors = ["Trade Team"] + +[tool.poetry.dependencies] +python = "^3.10" +asyncpg = "^0.29" # 异步 PostgreSQL 驱动(TimescaleDB 兼容) +pydantic = "^2.5" # 运行时类型校验(与 TypeScript Zod 对称) +pyyaml = "^6.0" # 解析 env.yaml(与 TypeScript yaml 包对称) +loguru = "^0.7" # 结构化日志 +pandas = "^2.0" # 技术指标向量化计算 +numpy = "^1.26" # 数值计算基础 + +[tool.poetry.group.dev.dependencies] +pytest = "^8.0" +pytest-asyncio = "^0.23" +mypy = "^1.8" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" +``` + +--- + +## 附录 B:TypeScript 实体速查 + +> 以下为 data 模块实体的关键字段摘要,供 Python 开发时快速对照。 +> 完整定义见 [`data/db/entities/kline.entity.ts`](../data/db/entities/kline.entity.ts)。 + +| 实体 | 表名 | 主键 | 备注 | +|------|------|------|------| +| `Kline` | `klines` | `(exchange, symbol, interval, time)` | TimescaleDB hypertable,复合主键 | +| `Exchange` | `exchanges` | `id` (UUID) | TypeORM 管理域,继承 CommonBaseEntity | +| `TradingPair` | `trading_pairs` | `id` (UUID) | TypeORM 管理域,外键关联 exchanges | diff --git a/engine/config/__init__.py b/engine/config/__init__.py new file mode 100644 index 0000000..2358193 --- /dev/null +++ b/engine/config/__init__.py @@ -0,0 +1,11 @@ +from engine.config.settings import ( # noqa: F401 + DB_DSN, + DbConfig, + EnvConfig, + LoggingConfig, + RedisConfig, + db, + logging, + print_config_summary, + redis, +) diff --git a/engine/config/__pycache__/__init__.cpython-314.pyc b/engine/config/__pycache__/__init__.cpython-314.pyc new file mode 100644 index 0000000000000000000000000000000000000000..67627b4cd6804c62cf642325449fe2685c087c1c GIT binary patch literal 389 zcmXw#&q@O^5XO^i_g_nkhvL8%ze}*b-cO!C@&5K^U#u0|GZb&+kxc0=5frFkz(fFn z=s-t=5Q+#Q5ko8zNJI+h5t*`MG75ftz$kT5b|tg>iA%Eib*-Pu!lgI*wUzz5x+qFr zw6-@@d1+h2^0|wbP2UYyrqm!8{|VBrt7>L8E;N3AGU~cRr7=g1n}M-a02pgcfR8X8 zkI*CXh&>XI)WiRQ=^;MI7D?PWf3V6}Zq$p~@T^{}E2Y8m>pE8)%naFV=5xAP)c{KOWBU>VFuXv1DqOX(*OVf literal 0 HcmV?d00001 diff --git a/engine/config/__pycache__/settings.cpython-314.pyc b/engine/config/__pycache__/settings.cpython-314.pyc new file mode 100644 index 0000000000000000000000000000000000000000..97002ec45a188e15487b635e7a3db6103e571f5b GIT binary patch literal 5744 zcmb7IeQ*=U72lIi-?rp0Y+{*M1`;AdP;5vrCZxm|2J>N*BQc$TxysVnC^<`F?*y@@ zlM#d#XhL8n0i2c)dzu7jn_#Ayga8?!%}oC5SOse5hEACV+3;6z>?D&rS2WR4sz`hjUxfyjl$Hk4({CA{4e1`RD(zCyxk4(6eC5o?_@(IAsrxd3?&A z66mR!s;9=TmJ2gL6QxWOZoxQ*c+281f$XhwAEqw7ntJ`{onIXs{`{k%Q@yEoj(Pr_ zcnkhKx6fQi9Xfia_p{;OeKgdAH1+urj}+Oturtshe{=D5e`hEVQNuycP*3ljKfIng zb1}8=eCp!);j)%~l^!qz%!K zBx|PCVMz`JbGKm5JdCadCIFt+LR1dC&9Kd`8cRtOd`{&BgKBCqgEtwsaRM)xmgwGO zX(o#}!3?_wj}t7at(mBH!3w*#Y1W3;mC;CBxIO+5O#NKALsEhPS!$^F_&oloqPA0M z)3fV6w=bO-dgJ}np*=%K`ZSvy1)1&88?K!}Ncj8j3Go^=OGs)9#AQ`8v`d;nmV)R* z89yRC@VK7QW*|>-3DQiq8EW?u5`xwW@+`NV^N~&57P8DBaOhaTbHE!iJhmHics_?W zArHGT-!TBA@whULk3}iaNT5T~__!ib%^C|R%C0C4fx3-~N;P9Rq5^Mji_(sOs^yDf zPzI4uIH-!Ef^*GIXv=)h!(hcnrI(~mUqdt)?~o#@;#&!}@~JccREo5RBa$!3^ypJ0 zRSielm4&fREl(5!kw{bxsFEmZGjv@q%xy*tQC0%^&pxt;jF^eTndIFg7Q%5OR`#%H z#D))cc-Y-I;y^wx&F3TUB(9>}8@_b`KMK>3)1=Ak(5#|}dxzKBMX@6qip$8uf_B6M za;C>6ifv)4s4^@+5`}JyD2AdzQKSVh549jE1fmt@EVwNm0SRpes0j8I4vGO)rQz1N zDk;zml367NC0Q25(;S@%b13KuE9e6_r%fkp@3Y|v_MRYL@xGe{?nJ{^1v9!sJv&Z4 zf309qqTwEH{#8ON^g%&jvIVV#WwaK^gvcnF7#V_5HM2P2SsYjdlWK)HunA`H9E;`< zq)=GNghCO_#hx>MpAO-!Qi?KlK3~@V>z-V+Wa)ELJ33;}{W>}<#YkN+P&3A7z*IAeoB|kMB0uIiK9oVi7L(&S zr8gTlCRPnTQJ+{d!E;b{YR^HvO#q46g=B33vUjc*gLh4JPqR0`ACjeC7t z5?sJ!AffV}F$eQeVuE%N?2Pa~ifA)cCLwTJp(kB@+#GV+kk+A@N@|=&C{Ezz7<@8_ znmrDj6hjt4O(_PFAopM)OGl6#U(71D^$FgEOJU1-nT^=0r#xgUzK0b(o%WmbmhoCo z#{K5;Hl6#aZQv$c6L<a4*}GWGu+^$d`|Ub6URRN3gFbW2D46l5-xn*%&y8&mKd_#BeS~1a_0(j?Z0DNc zzT+W}lI#w^N);pw_taUWlKs}tU1pp`!kpLIWz|70J_lyplFn^gJVVENhW>mcb0bRM zm5%Pab>WjbPaIDP`|UHI4)=a^>(V=uu1l#8yHm$cWiL%|57Ic=Ws0k9kJWoS^K~Atg?X!HmR=4ks-hLH2Cr|5s;i^% zNN5#175QjPifBCE(=?kBXp=-a8VG6j=71cRn249xcyxTthC*TtZe|+S27U;~slvlr zJWv=sK(ydsk;$d?L0Uc2GFj!D>xa)dBRig2xqh!G0PCDEJj#+w)qtRu*ON=)An4Q?2`@`MEJ-Y^MzUww$U-b=F@u63K z^GbK%s>`#V2UxT}e%DN_d3%0&!#T6x{mfd-)&(!~d&b zVsph+R&Ox<#o(wfvUTOh(ib{qhYU%q#?@)u3XStkjjfn&PmQgD0Wmd`sM1v+*TjHj zy0BdbQc5YC@*XjBP232h-Jx{-!^p)5fGFfgdnQFLV0#u;{b-;`1!FKrVhT_N31f~# zHtNPF0tr4xVv1;l$g=9uSeb(>f(hP^if6^|)Vc9`DX^g`7OT_B`SId4sX)wI7H4+9 z!8QGRa$|qO3awl5dc%spMCZn9*t;!90&O;_z|8#k-UU1Of+PMU2n`*0Gj*`<_TIBY zM-HV9zcKvVy_wG`AMCmP!R~J^?!R^JVCwK6Q)hlX+_QVA`;DQa`@Xq&psO?+?m3?y z>O2(BaJmIa3y`jYtQfo6)OorLK3|vF=bJx&{)(=d+1^YtTjya(^zttA0)(CAP2N1s z6r*tEr$56v@pw;Pb5YS0bwXDl!P8HJlQSkm!cCvSrtwsD(g9k+CKS_XAS&^WjsWeX ztuTf;d>R#Q@wWcF+3(eyT++Lw$9t;!Pr{kar#GK!?0fcO`suE-yZXZCc6?FvW!aUo z%eH>|+H3r}8@z4MUVhp5Mf}UoE1lO?_>VvF?$WoH4!9R2-3v}V*H?SZ{nWMQEd$MB zvRUje|3$wwFlIEE%wuL^viIlBeQ)#0mfn`0jeSpEoHtOnHd(i}f9bk@`}%+I8(5@y zE7;{tvmo5b;nsBCU{*V00KpWGG`b13V2x$IY*7rO^!I>?87E8M7z{Hk)Oe8wc4S9F^d+LIy9H=2zfcOs~ua=!26Cp_v=%#fD8yOp}w77VA^#uNhC%7hxoz z15y48@Iq@?Cr8{TTfw6&B Path: + return Path(__file__).resolve().parent.parent.parent + + +def _load_yaml_config() -> dict: + root = _get_project_root() + yaml_path = root / "env.yaml" + + if not yaml_path.exists(): + raise FileNotFoundError( + f"[config] 无法读取配置文件: {yaml_path}\n" + f"请确保项目根目录存在 env.yaml。" + ) + + with open(yaml_path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + + if data is None: + raise ValueError(f"[config] env.yaml 解析结果为空: {yaml_path}") + + return data + + +_raw = _load_yaml_config() +_config = EnvConfig.model_validate(_raw) + +db = _config.db +redis = _config.redis +logging = _config.logging + +DB_DSN = f"postgresql://{db.user}:{db.password}@{db.host}:{db.port}/{db.name}" + + +def print_config_summary() -> None: + """打印脱敏后的配置概要(不含密码明文)""" + print(f"[config] TimescaleDB: {db.user}@{db.host}:{db.port}/{db.name}") + print( + f"[config] Redis: {redis.url.replace('//', '//***@') if '@' in redis.url else redis.url}" + ) + print(f"[config] Logging: level={logging.level}, env={logging.node_env}") diff --git a/engine/data/__init__.py b/engine/data/__init__.py new file mode 100644 index 0000000..6634846 --- /dev/null +++ b/engine/data/__init__.py @@ -0,0 +1,3 @@ +from engine.data.db import close_pool, get_pool # noqa: F401 +from engine.data.models import KlineRecord, Signal, TradingPairInfo # noqa: F401 +from engine.data.reader import KlineReader # noqa: F401 diff --git a/engine/data/__pycache__/__init__.cpython-314.pyc b/engine/data/__pycache__/__init__.cpython-314.pyc new file mode 100644 index 0000000000000000000000000000000000000000..bce53ef223ffe7c012bd2516d92e1bdd5fad8ceb GIT binary patch literal 383 zcmZWj%SyvQ6rIVdvCMpsyrwX80R8Z&vPBL2Hqlz`{amKlPq=wVzIx`@kLbSVZs0<-&9nX3|- z7JaJ*7JC0y5UeMLBs{Y?n!UL?tshTb4AayGeaub^V2I4D3*m#lWwXAiX l$BUH-k12B5zzL4XiPLF!2tHiSdB;S@l2?{=Y=Q%teggAUXjA|I literal 0 HcmV?d00001 diff --git a/engine/data/__pycache__/db.cpython-314.pyc b/engine/data/__pycache__/db.cpython-314.pyc new file mode 100644 index 0000000000000000000000000000000000000000..e1113003a6b39621b163ae657a3b5c1e06bba065 GIT binary patch literal 1782 zcmah}>rWh26hC)%XLs49Jgl%r9^C~}T_3CljkOXVEs1SH2pu2z0e3R&47jj6L+;FK zF_^4CWov8O7y|WCwTZR$gHmInEt2>zv^55t5H;fP_=E@s^^@n`-Gvs3p3Izk?)#kc z``vSI$u2*j+;(Pf@81HzJnIw$`&`x}Y!t|Ve!BNxuBE+DNpzR8RcYYo zA1!`sOpTgR zs`Y6D(H0|~PHDPjMh_aQ7PY8?Iu~YCRV*c{b~W}7*u}D}=(=GkmL|(IfCm-fZw66i zU;_fAe3hy9y&o38Q_RgZoPs!01@WOV4B%cHM57M&;Lfe^y6<5@cWwsR?k;TGYQxa! zDfD7O%pa?j1j!X8lKT-cH~uu@-HYivN;0li(GTWLH`Id{TKx~Z5+ov~pTPA?r@nSh zk1u4$o$PzgH&da1VZSn+T^N1a$)0mAo^Vc%uS}nGK7MO)?9$5gafYm2qiWqsI%&z6 zHQiL=R>II_swt{$=*aln~7I7w3MSct2Az;Qi`t17IMf)TXreJ z^0G!{Q?smu-eX4GwkY=_gw%`P+JQJ8jp^qjEKi>TEyjkjH)H#HG%GiC2X;9)Poc4l zNnZDWZ`aUkbCoqSm602jk-4(U*$VuEv(>e;HO*JlYt;wW1D>FN13cyathinR#pRjS z)l%&JWwI-V?r$cOWWW1zpqcE<;A&j8GlM08v(!ZyM3mvzMj34)gmN!ADR~T>R$Q@p zb2lo+Es<2LGIh3rH$3KBc!hZ9c-4!>c3$0uV@%XA=U1+Sx}ELXCJ*KN+gZe`2r{dl z|C=ll(#TnGj(xd2@!4j*K+ZT*V@Q(aOvV|VTA281>GE)nGgt~}1@5rLc+x^|LhyzfM6##eRCS2bI9`fCfsX6YvEc_&cLTyBuZ#}9K6pm` z-h1bD2>%`c@&e&`jRa^duD89itQztBJ$7ZTx;3IZ-HD#Y!fT`KNl}&|yGZR~&f+Zh zp0XE386=#X=p#FoSJNl+kFPmpsOhBkIIY7O?kLg?{`}2F2>B5@=Aq;#*#C1)I3s=s Q;XgcBDL1N-(%j2`06-(CzyJUM literal 0 HcmV?d00001 diff --git a/engine/data/__pycache__/models.cpython-314.pyc b/engine/data/__pycache__/models.cpython-314.pyc new file mode 100644 index 0000000000000000000000000000000000000000..c0881b9bda264b54882cf1bab3672d0a1539ce12 GIT binary patch literal 6083 zcmbVQd2kcw75`RR-IjIuKEP|h6k)J(a3tK9jg1eeI67%DJF>I}HL_$@D-*YwR!%|^ zCQwWgCn6;oGm~bLc0$OcZ7ATZq*68R=uFFBW0wXc<2h^(RJ(AY2|hVgCy01)S!|YC8-glCY99WF)t@BOLQ+q zi21;va3UOHW=5{goVzmf>E+}{Z!zD;-i04?cjQv?)$@$sD|p@h9yb>f0z!a`xS8o| zpU-~q8gpo{pWE;0WRh1tnjU-ShuAA-bMnGia`@cr^-q#lKDc-D12e;{WoEy;0kY|9 zzsqw_Qi`gpMEZ2Kak7&qz%rKYxGb%3rXx zv0Jn6e#&G%J`(c2$gw_egu8e1Z1Ri0C9l7meHVVH&)dfZcY~U7GmExTi!*0pGndZD z{mwS(_d-{F(ALcP;pxBs{?4u8JD1PSy#Lpk(aU%Lc$R5o+V(UvGZ(Mld36jdW=+ObSugkPASS1at ziJ(y^_80`)5hB<<8kl>dXpx57!}-FzKROIwVQyz;uiuiF!JR8tlVc;&gwDiAlUH9$ zp8jg~s~hr!GTEV(nrOl>;P{Crc%n%G&tUjep5sHk1(O1f zzF-*Eea@BNRmr4)>p(ad&70eNSJ|Y1@5yLb;MklN@_p4N1zHdwrg^p}I>`1w0AW)P z_4r|v$F`GwVg8pawv#m})DYkipiDR#5|rV5sG&uZ0u2NrtTfbq#r@%RlQl7S9r6J@ z6TI=#z>Qcjw-m@0Dn^cwW18n)Bg7BVTJlrs1m&ST8aD;et-XuZqMql3C?Dbt@bDP{ zB+~bW`97~8T3PrGZzSRm_ym@Xpz*SpxY2z$!toI|&%MYEy7z>AF!v!L;@%C=4A=ejLMT~K&Hu)X+fsVlhH?& zA;y+zwRT-b18>mfSm_>NrO&a_Kf=n8V`X@Rl`+T4_y{W#e6vZd<4SNjJ{ENW9=n4Kg`aFLt_*)D3<5HKde0`pO@lmCa!r3cUx1?A?FKr&% z5enCz9}2Qsw6v)$Gh<=**fUl|87sTms)wS zWrbLw4r-O+Ik1ep1z{2+|F${r6NlYA<*113-&>l}0{>lEO?=-sWj{&j;31_arPc9O z-;^v*8Sv3a%4=n<2_EOo#91*s5bquN&4goR%7UVz9FY}8#l%rQ+#5eU(l=pWk+Px4 zPD(1{`uM32Z4=H_DF=$2q_iqtK5Cq-a!r)1PL+TxRZ2GRr2esVa?^p^n~q*8iw}&3 zChOKuRBagV>0L;bp-DN}OLbD?o!-fop4%;K^`OJ8?cOg{+7W1{Fq9(p!v~jYgX`7Wi!ivr#OlaiA=Xb?!bV-w6&;oRXS2dQ6>%^DB25{!YTfY+#a zt%|2JyasG^DqgSR4JzKK;!PP|3-8E*XK2g(7@g%Yx+0Y>2f(3CYeuI7zF1|KlR`je zdyLMGItY0Ok4cFKge5#CGQJE+Ig$z_l}M_PV0z+fkklesf&>#cUx$Q2QjcUA5SO0E zyeDcS0?#i;Db5wY0!af9mxc#CBD@tyCRa-l%G*$C2ZDEG9>~HQUq>t`Uyu9-ncs-~bHr>L+LwMF`AwwAG1QUX zjQkd2#WuGhzk?zrH5b>9xD)iM^iB#zyC_msf4S^KTY_#%@0LV+C{n%ba>Ivp3EG`* zmP9QSaaLbkKC&c1H>6=B3XZK5v6m+3>U5j*xQ`-^s*9DA_GJm$m2Q`09ctPAPLxT@ zT}C*EXk^(C?De1wF|#aGDA6GDP@tTQdV?8_jb(cSd_=(QW+)751IzluK9>Cvb0u1& z!l{pgJ+q%*1w$@6R5ea-Gz9I4Wl&T>H5Bjx_6mHUCn|6e5QAm5?DBEJAj^J6$+L=e z#twN>u~?HTHfd7ZI4() zD`77R5uz7us$F|qs5czNE=*tlcqaZ1Y|QVTJ~umdW%}0TnV~OW%YSF=#@#p0&z^Y` zw%XG-PAAWvP7eQR_Vf2jXQ2&XE2n}4T^8yJ=oILq!A^oc7$ma{BNBKZjEH6_+e1Ol38F>L z^74bNpAYyrwjZ)^AOhzJq7&9Jp27^OWQM*#h&?Gl9gYmt;nKl_sQ3DWzyRmc@s02f z8BQc%ks#)Yz#`V>O=Od5owHrgWaPI%WW>?S$YQ`KHp<9iLf%Xqr9=DE7UUtBmnG=Z zv=twUNqJR*u1wqT!A`1b6Ld}5fe%hnTz-K&$0q3I=@Jx_ssN@81?9x*NYEAcE8syD z{1$!<_H!+gCM0eoPa)ZgWE+z0NS;CRERuCd)&q&CB5)%Lkw~3nfPqfN$hY+2l$|gu zVtc1ntb!mcsfe{az;95(3SDx-N~uie20)V&R!0}Zs#^#98d!rKdSfOG;h0AU$2>X+ zExiE8cJ-v&d;pV7-sLfD#IJ@+kdJ`im=6wmtR9n43;}8L=y0ux`h$Vva1a&693sad zgkSRf*G<2299y;P@Cz>=Z0_iIxpjZX9@h#uxia(vW^sI|OC<1Q^K-r4Xi#8#VKs&M z!NO(5E70`+5qngD5PzDc(C5} zMj)I_Siu8S_w~DIIZPHK1ZU&1saGtKM>Z!gj)GU@=RgVt1_r?PbS5&2N-s2@+n%87 zmGH1tT!@|vC+HPQh*<3yFPRXrS0?Bk_c26NYu^6oHIIvr-;1ObNgI-VNU&d`J}US^ z;Zr<<^ONFX7uq0^`nDOEOgxmQ^`vr1{NVWNZ4(vSAq_35k2Oz0gtSgMOJgnHR@YrM zer@~GHc`DJ*7g8DT?rLztqX8aica~1V4~oxS1Nr(WB;Ih0?Q-3^HNu&yeNgH^gR|- zV4*8-R*YNZWWS9+05S}V2i#xOro6><|o-aYBM$89M>P2vBAr6O}AAphQ?ocbgg5 z-K@hS*b{@Dgy0F`IeEckcqAbk3}gwJt*!joDkl!H)3sZZhb7xTwj$$fdDvfj&h1uP z@-ty-XQsBM3m;D3ci(&UednBem!_szDR_Q)c;OTBT#EVzUt$lVDI=RfX{MZ%Og&CH z={mZWmgz#&4RwZEqin2YWQM#O>sXnEbf(T!YnIIm&TYmFtn0E=CTXAG~li+l;p3hB zBLRP-z=zMD=>OowfusAv=Q<&)W|34gN#(Ru&S@v9R+5^erCOX0lA26XQ?yj8Gn=HQl2n_P znxv(sk<@f8HQAXVr?@CN)oFp%&(S~B;}gUt9=<7lrOD#PuM_MFFAara5AG$H5pE<4 z6x^jQsxg)aZ}X_Lbp2UcVac?D!bn13tAs5qRDM*tNfJbAl ziGtiDHo~Sdd!${B-q1FMS&v)DpqRD>#T_15NpZWq0goj4{9YKdgw^IYi#DvC>~?z^ z8-p@zP57Fe#IG64$%e904v0TCQ=^oT%^OB>`X_I$IlYFcVg_U;EDUbby|^$TQ07vF zusAMiJ?P_~tJGv9`;u(!<*ntPCKdcg;i6Y|_vRM0 zRJT3cXUlBg-MYJ*%N;gSc}s3f*y*19mGlX^j5+C8Nhhp3&N_bT$VL#;t-I5JtM4>| zq@4_rEJ-neG&s#5F^?(EgNC%H)Qos4;{H=i(ykp1K>!$kqb!I!0cxz0>=9)*LXg4< zjlP)nOYe41<2FHI1O6R;S+qm#B33a_ zD!Dip>!Mu-m(j(zOl4ZXusai5u(&K~)Fx|_p@eEOJOq;yvOdPdT3L@1H}083xvXW{ z80!s>z{FXo*-$rRCHwdh*vrLDhC-6!r*Kyn8Jm1i!KyM=r0qcX{Lx6~KM!?Y8f@De ze&tg5+5JFLDj}*&3al16d?C{IO#juZk$vsq=g&lro&5G%%d#DODZc{6GF5H|wG72F zA3t!e_1kN$gflC|ZBn_FhezE_<$V8@*CM}u5vUSc3BR$2uiRL*X+!l!s6dLvHe0zm ziYYr>emCiSVz+8%`bI712j6=?{N{7&XJVfnZ^inGZQ*hr8Tt6hNy})PJ2r4M0!|$m zyzngPhiv$AK60!%{Mws?*AJu3aQrk(oweTcEzUH4dF3}ap++zyG#2yQ{oA$|^MT+mig|A!C_x)Lg8?9*iRGVa3IaN7xxDOoQV`t@ zO}pF;9*HCu$EE3ICpJR}&Fu{~HOeqL2`B-2^+E5zYqH>TdmyKQ>dCzkHH>oMD{l2Oz&}n*94A ze?>z6I%jo_ldt?cvPgWzhN^_zHFaz2HuCdV>WM{+SO7m#h21L1-t7eJl%%a`^@m7M(GEo>tt&d}3SG z&8-^#F_l&{JRb_*zLiC#J#brE2Ho$;s#r=Np)2Mxojhpo(-rfX%N%GosKSbc%#A_@ z_4zc)g)GPf-ovjayjKg#_vAhBB!J;)C-NS$p)rB$Sl~Jn;kvj5(U(D`F-BYn-HD4( zEJpGD2&^88N|hi8fz4sFhba(*!0<&SFw{6CD+_8?B&#IUl6A65fmfBR&;*G+NFao) zz60fs$chEBGP&5v$m&t=qZuxyOzUp~Nts<{ASo`6q_`PLiknDMAq$YDWj)gFxF<%i zg_H}dd!VqN!QU#4#yFkEjzcx;;Gy@y%>(cqy#732&qq#mh2MW?sPkO7qeZ7NKzz-w z>$nj9)fs^J@b$Nbnokb>X5YZxb1|Cgzq()L#4%h0FxlwbRKv%$)Hd|Y_3-|Ks!JF- z_FKKH{>ul$=i4GLT~bLCnXlf{^EdoQm?F<#7XNjw($-YRYaCl5#8=l$j`3r(ah4?* zFQQFP&_G-Ynqn4(o!|-zMb_~UKZYp9SmiDhB`8X@dg!1_5;`0N+{uKg;GT{k9soj> zbc`<;kMRlF1b0Mei9mhMBv7A1pnf*wbZ&lr^C|YCgwSC6r_SqaQsl!VVN6u4pIP{0Q@EZe)BlsS6E`$5E@pJ8U*ls1>)bRJYb6B0URQ6Y<>tMJO!1c zpMoHaa4*0-1%faFzNn0V8izb!K@D6QM>GkwWXdu7oQJBHk^wC zva!UGjb%o%u}nfX){wT?S# zXE7hqcF;e{rcpvJ$XP7N1j{QE8YY(4K2W}=%p6;t8ZE=hHX$_qQQm%e`2ux+n>W= zlfYoxc!vfX)SV_ez&)nNDO|eOA7{cfKu{aP+|GnEhsU&lI0YWRGr>@gnQ{pw61L5x zCCu@gY0?thj3q3Sl~`_qFuP9=v#arm>tHS(gTLl+CZo0K46Ldaqu|%ouBmeI;r)9D zuI>XH3M5!V1Fu|)9C{s&m;0|A;zOQBpGV@W;UM@S(E|a?NXrp^W5r!-{0+Qn^bxa8 z3o@U&7QWVA&Tm)1}gj+h*aK$}SldWcA zO>Z(yFxmcEm`SyD{;zD5l>e{OcBh1{FgVo;gS$mx@Q^|=!XX9t*LYl@nABqkEmBUR zC<>>=PJCho{A8w!piE^9j73itq8*@`0X8%Pg4+Y$oo>MOz4u5@$?AVy@UijZnt!+URBrfe)8_6ik9WJb-rTaSXVdo2 zo(Oh_q;B~iZifD`=ZU9#lK<(8>;=QvZj>@wtivnNcF3d9c5o2e;T8jVx1~Dxx;lYy7t=g=tgh|Fg}PcR=4 z^@;g_Hrb5k1G$wDkFTW$2*?=KfDDP2A-SJ5sN{}O_nG;-4@yb%KoI`UWlM)C5MO23 zqb$CdFk<38m^*BSH52?-W+WLg0hvFNQyO2|^b@ zTh_tmwxQ!EBCT&l+S|2LJsxNUBOIsCKmj#O9^QWjUe!`aF>$w}5^g$aqqz%5Gj`#L zkkan26Iexc{dhT4q8=tEWD>eb{4zO|P~zcrb@lpLqr?mRx_WFY7RPY=eca0l0obfm}p$Pf8xS=tEc8&>X_oY1-!^2U7IVCNcMy1$CGOZ3c z{4IayIMKypGz=l#Z0MS)bTSy1A0!0)!q0O5MJdL#37KL1705g;SW$ z+zJPMk*=^YZ*!n^rB|dgH>?b(H*7S@bQWX+w4iZFpyhs0W`q{)x(oOkR06=@H9;mW z;;W;KxE7k78D2)9l!TFc0x$Ex$frOM_W)m1_W(661~e9UQ-KCYAA5A5sl8J}A3$RY zph2k7`kDYWjEfP{FpleD5+NoQ<=qA~^QjWJ?l!)}dK~k($H~OZY5*`+128xS39)df zsGIX+m12i_iM`>CmFdd^0K%ke0LT&m+yemU*8q^!0FZ6^ALuJ~tL0>ej(aM!>A^+y zGRbscSVDpVa5qoA@g(s&CJr@shTGqV%b`O@55q0af#1E+fBBt1BOaEkKpQtS^)9K3 zlmC?fSz|ak?sF1K{e=Nj1uI0<$*y!vi>Qii+;=H-aqQ{^r(V54gHs9x{4)F}i~bNC z^ryleAz-V95(u*55gfUQA>sK29KY$%nw+eKxzSFVo&q7qwCad^f-BX8t|B z`S*U3fA8hY&+=D|unbs~`Kvm(K3jJC)2&bU+6up8sg#12+V=X^`p=W|zsO%TjD?~U z1Lol(g3-{=?OSsp@wT)I1`1K7xfoT-Wjgafdz*&H(K`;%ZdmeBF371YU_LTrp>!~i z^3g0BWdRFP1lt8bR53UB8$ECn**^$q0-(AXlKtNS(iItpe4vwrda(2`W9VZLaA%pk z6xr}fEK)37BDK2RJA%F@AT&2#FMX=X6VP&OZueF&S!BE$-57*yv)kQKV z6ptarAEChS5+6pf4#m$vD0aBM3U^igUbvwI`=X%<&Q_tF^tcPIULg>0yJ2ADej<#X zT20tiF_5#2I7wgsAlR5X2~svX73p_!Hrh1z}dV}D_kAM?*IS* literal 0 HcmV?d00001 diff --git a/engine/data/db.py b/engine/data/db.py new file mode 100644 index 0000000..8816d7c --- /dev/null +++ b/engine/data/db.py @@ -0,0 +1,28 @@ +"""asyncpg 连接池管理 —— engine 模块对 TimescaleDB 的唯一切入点。""" + +import asyncpg +from engine.config.settings import DB_DSN, db as db_config + +_pool: asyncpg.Pool | None = None + + +async def get_pool() -> asyncpg.Pool: + """获取或创建 asyncpg 连接池(懒初始化,复用)。""" + global _pool + if _pool is None: + _pool = await asyncpg.create_pool( + dsn=DB_DSN, + min_size=2, + max_size=10, + command_timeout=30, + server_settings={"default_transaction_read_only": "on"}, + ) + return _pool + + +async def close_pool() -> None: + """关闭连接池(应用退出时调用)""" + global _pool + if _pool: + await _pool.close() + _pool = None diff --git a/engine/data/models.py b/engine/data/models.py new file mode 100644 index 0000000..0f4714e --- /dev/null +++ b/engine/data/models.py @@ -0,0 +1,100 @@ +""" +Python 数据模型 —— 镜像 data/db/entities/ 中的 TypeORM 实体。 + +命名约定: + - 类名与 TypeORM 实体类名一致(Kline, Exchange, TradingPair) + - 字段名使用 Python snake_case,对应 TypeORM 的 camelCase / snake_case + - 所有模型使用 Pydantic,提供运行时校验 + IDE 智能提示 +""" + +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel, Field + + +class KlineRecord(BaseModel): + """ + K 线数据记录。 + + 映射关系: + TypeORM Kline.time → KlineRecord.time + TypeORM Kline.exchange → KlineRecord.exchange + TypeORM Kline.symbol → KlineRecord.symbol + TypeORM Kline.interval → KlineRecord.interval + TypeORM Kline.open → KlineRecord.open + TypeORM Kline.high → KlineRecord.high + TypeORM Kline.low → KlineRecord.low + TypeORM Kline.close → KlineRecord.close + TypeORM Kline.volume → KlineRecord.volume + TypeORM Kline.quote_volume → KlineRecord.quote_volume + TypeORM Kline.taker_buy_base_vol → KlineRecord.taker_buy_base_vol + TypeORM Kline.taker_buy_quote_vol→ KlineRecord.taker_buy_quote_vol + TypeORM Kline.trade_count → KlineRecord.trade_count + TypeORM Kline.is_closed → KlineRecord.is_closed + """ + + time: datetime + exchange: str + symbol: str + interval: str + + open: float + high: float + low: float + close: float + volume: float + + quote_volume: Optional[float] = None + taker_buy_base_vol: Optional[float] = None + taker_buy_quote_vol: Optional[float] = None + trade_count: Optional[int] = None + is_closed: bool = True + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + + @classmethod + def from_record(cls, record) -> "KlineRecord": + return cls( + time=record["time"], + exchange=record["exchange"], + symbol=record["symbol"], + interval=record["interval"], + open=float(record["open"]), + high=float(record["high"]), + low=float(record["low"]), + close=float(record["close"]), + volume=float(record["volume"]), + quote_volume=float(record["quote_volume"]) if record["quote_volume"] is not None else None, + taker_buy_base_vol=float(record["taker_buy_base_vol"]) if record["taker_buy_base_vol"] is not None else None, + taker_buy_quote_vol=float(record["taker_buy_quote_vol"]) if record["taker_buy_quote_vol"] is not None else None, + trade_count=int(record["trade_count"]) if record["trade_count"] is not None else None, + is_closed=bool(record["is_closed"]), + created_at=record["created_at"] if "created_at" in record else None, + updated_at=record["updated_at"] if "updated_at" in record else None, + ) + + +class TradingPairInfo(BaseModel): + """交易对配置信息(轻量版,仅包含策略决策所需的字段)""" + + symbol: str + exchange: str + base_asset: str + quote_asset: str + price_precision: int + quantity_precision: int + min_qty: Optional[float] = None + min_notional: Optional[float] = None + active: bool = True + + +class Signal(BaseModel): + """交易信号""" + + symbol: str + signal_type: str = Field(..., pattern=r"^(BUY|SELL|HOLD)$") + price: Optional[float] = None + quantity: Optional[float] = None + reason: str = "" + timestamp: datetime = Field(default_factory=datetime.utcnow) diff --git a/engine/data/reader.py b/engine/data/reader.py new file mode 100644 index 0000000..301409d --- /dev/null +++ b/engine/data/reader.py @@ -0,0 +1,278 @@ +""" +K 线数据读取器 —— 从 TimescaleDB 查询历史 K 线供策略分析和回测使用。 + +所有方法均为只读查询,对应 data 模块 Kline 实体的字段结构。 +参考:data/db/entities/kline.entity.ts +""" + +from datetime import datetime +from typing import Optional, Sequence + +import asyncpg + +from engine.data.db import get_pool +from engine.data.models import KlineRecord + + +class KlineReader: + """TimescaleDB K 线只读查询器""" + + def __init__(self): + self._pool: Optional[asyncpg.Pool] = None + + async def _ensure_pool(self) -> asyncpg.Pool: + if self._pool is None: + self._pool = await get_pool() + return self._pool + + async def get_klines( + self, + symbol: str, + interval: str, + start_time: datetime, + end_time: datetime, + exchange: str = "binance", + limit: int = 1000, + ) -> list[KlineRecord]: + """ + 查询指定时间范围内的 K 线数据,支持任意周期(1m / 5m / 15m / 1h / 4h / 1d 等)。 + + Args: + symbol: 交易对(如 BTCUSDT) + interval: K 线周期(1m / 5m / 15m / 30m / 1h / 4h / 1d 等) + start_time: 起始时间(含) + end_time: 结束时间(含) + exchange: 交易所标识 + limit: 最大返回条数 + """ + pool = await self._ensure_pool() + + query = """ + SELECT + time, exchange, symbol, interval, + open, high, low, close, volume, + quote_volume, taker_buy_base_vol, taker_buy_quote_vol, + trade_count, is_closed, created_at, updated_at + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + AND time >= $4 + AND time <= $5 + ORDER BY time ASC + LIMIT $6 + """ + + rows: Sequence[asyncpg.Record] = await pool.fetch( + query, + exchange, + symbol, + interval, + start_time, + end_time, + limit, + ) + + return [KlineRecord.from_record(row) for row in rows] + + async def get_latest_klines( + self, + symbol: str, + interval: str, + exchange: str = "binance", + limit: int = 500, + ) -> list[KlineRecord]: + """ + 获取最近 N 根已闭合的 K 线(策略启动时快速预热)。 + + 仅查询 is_closed = TRUE 的 K 线,避免使用未闭合的不完整数据。 + """ + pool = await self._ensure_pool() + + query = """ + SELECT + time, exchange, symbol, interval, + open, high, low, close, volume, + quote_volume, taker_buy_base_vol, taker_buy_quote_vol, + trade_count, is_closed, created_at, updated_at + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + AND is_closed = TRUE + ORDER BY time DESC + LIMIT $4 + """ + + rows = await pool.fetch(query, exchange, symbol, interval, limit) + + records = [KlineRecord.from_record(row) for row in rows] + records.reverse() + return records + + async def get_klines_by_count( + self, + symbol: str, + interval: str, + count: int = 100, + exchange: str = "binance", + before_time: Optional[datetime] = None, + ) -> list[KlineRecord]: + """ + 获取最新的 N 根 K 线(按时间倒序取 N 条再正序返回)。 + + 适合策略运行时获取最近 N 根 K 线做指标计算,不关心特定时间范围。 + + Args: + symbol: 交易对 + interval: K 线周期 + count: 需要获取的 K 线数量 + exchange: 交易所标识 + before_time: 可选,只获取该时间之前的 K 线 + """ + pool = await self._ensure_pool() + + if before_time: + query = """ + SELECT + time, exchange, symbol, interval, + open, high, low, close, volume, + quote_volume, taker_buy_base_vol, taker_buy_quote_vol, + trade_count, is_closed, created_at, updated_at + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + AND time <= $4 + ORDER BY time DESC + LIMIT $5 + """ + rows = await pool.fetch(query, exchange, symbol, interval, before_time, count) + else: + query = """ + SELECT + time, exchange, symbol, interval, + open, high, low, close, volume, + quote_volume, taker_buy_base_vol, taker_buy_quote_vol, + trade_count, is_closed, created_at, updated_at + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + ORDER BY time DESC + LIMIT $4 + """ + rows = await pool.fetch(query, exchange, symbol, interval, count) + + records = [KlineRecord.from_record(row) for row in rows] + records.reverse() + return records + + async def get_ohlcv_array( + self, + symbol: str, + interval: str, + exchange: str = "binance", + limit: int = 200, + before_time: Optional[datetime] = None, + ) -> list[tuple[datetime, float, float, float, float, float]]: + """ + 获取 OHLCV 元组数组,直接用于 pandas DataFrame 或 TA-Lib 计算。 + + 返回格式: [(timestamp, open, high, low, close, volume), ...] + 时间升序排列。 + """ + pool = await self._ensure_pool() + + if before_time: + query = """ + SELECT time, open, high, low, close, volume + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + AND time <= $4 + ORDER BY time DESC + LIMIT $5 + """ + rows = await pool.fetch(query, exchange, symbol, interval, before_time, limit) + else: + query = """ + SELECT time, open, high, low, close, volume + FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + ORDER BY time DESC + LIMIT $4 + """ + rows = await pool.fetch(query, exchange, symbol, interval, limit) + + return [ + (row["time"], float(row["open"]), float(row["high"]), float(row["low"]), float(row["close"]), float(row["volume"])) + for row in reversed(rows) + ] + + async def get_available_symbols( + self, + exchange: str = "binance", + ) -> list[str]: + """ + 查询已激活的交易对列表。 + + 从 trading_pairs 配置表读取(仅 active=TRUE 的记录), + 而非扫描 klines 时序表,避免全表扫描。 + 对应 data/db/entities/trading-pair.entity.ts。 + """ + pool = await self._ensure_pool() + + rows = await pool.fetch( + """ + SELECT tp.symbol + FROM trading_pairs tp + JOIN exchanges e ON tp.exchange_id = e.id + WHERE e.name = $1 + AND tp.active = TRUE + ORDER BY tp.symbol + """, + exchange, + ) + + return [row["symbol"] for row in rows] + + async def get_available_intervals( + self, + symbol: str, + exchange: str = "binance", + ) -> list[str]: + """ + 查询某交易对配置的 K 线周期列表。 + + 从 trading_pairs.kline_intervals 读取(逗号分隔字符串), + 而非扫描 klines 时序表,避免全表扫描。 + 对应 data/db/entities/trading-pair.entity.ts: kline_intervals 字段。 + """ + pool = await self._ensure_pool() + + row = await pool.fetchrow( + """ + SELECT tp.kline_intervals + FROM trading_pairs tp + JOIN exchanges e ON tp.exchange_id = e.id + WHERE e.name = $1 + AND tp.symbol = $2 + AND tp.active = TRUE + """, + exchange, + symbol, + ) + + if row is None or not row["kline_intervals"]: + return [] + + # kline_intervals 格式: "1m,5m,15m,1h,4h,1d" + return [ + interval.strip() + for interval in row["kline_intervals"].split(",") + if interval.strip() + ] diff --git a/engine/example/__init__.py b/engine/example/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/engine/example/__pycache__/__init__.cpython-314.pyc b/engine/example/__pycache__/__init__.cpython-314.pyc new file mode 100644 index 0000000000000000000000000000000000000000..f7a3d9bd9a5748eee0afd3d887d17a49b7117162 GIT binary patch literal 159 zcmdPq5CH>>P{wCAAftgHh(Vb_lhJP_LlF~@{~08C%SJ!6IJKx) zzbG|3wNl?DKe;qFHLs*t-#I@eRllSt5lE)yrDx^=sfxthf}B+S`1s7c%#!$cy@JYH f95%W6DWy57c15f}!$8&*gBYKf85tRin1L(+bSx#6 literal 0 HcmV?d00001 diff --git a/engine/example/__pycache__/test_db.cpython-314.pyc b/engine/example/__pycache__/test_db.cpython-314.pyc new file mode 100644 index 0000000000000000000000000000000000000000..87e0f2588a8213d588e10a3cee9aa4b3e38f570a GIT binary patch literal 2148 zcmbtV|4&m_6u-Cc_1h~gU@IW|!gWj|5!!&jjKf8QVF?6TMY1@sDeVI*ZD04^tFvs$ zh?-b~#u{a^sTnLYt3OQHl1+3tX5t?J8=2(I#AMQn*$=`nTlU+|?HB17`)N0M_ug~P zz2}~9pYyhwj2wdT+wkVYZ7f2MD3KXJC&%(>SO$>;$><<*;ASjiSXIp`yGmBs8JV%O zGFw7rOtadqku}LZ+pM+gWSw0v>+PJ(l^}<@5Xpw_9EWE6+X#7z5qc4I<91kAkBz9? zun|?K+0{6;0S&k``ITuy3h=nZ@Oa|P_4w`ciP%{D%EiPtLkl;CAAf#({`;XvgU2{- zVJf;Xb~^FVt@z~Gr<0NR<=B(4TaPbYPmIzzerK4-qRYeE$o$06)5%j$CqLl`j9^$k zBm_u>pYVY`Z-BS*M_vA)kGIOaBs)Dl92Y-zCUG{JxH2~X-M5)r9>Lwui^MJXgM!49 zP>@u3lIlDz*+o(o#&3QdkDg9^d39mxBxxa!Z%?INh>VjCub-FPE+5~xn`{4`eJ0Jx3GOSpFqBkHr-9n((+vk)*e!oi$!$m`j4~%y3Zb9@sC%{cn z;oLqj%^4H~pF2C|c|aV8Z@EW>&?~3}lHHXKIzkTfN&>RDcuj8hStj#DCXpqmOTC&G zM4hNzo8hjk4wZvBsCKZgrM2vV@;MCqqj->MRdXnavux3H=`%MwpWBs#9NH>o#mY|X z(3NKuTOMi>5DKcY^yoWP?MXOfw58T|<3DLiWqg8HIJjl{_X&r=h#WZ}qtRV8k}@O6 z)~yl%nUtA8X88|UF31YjkmVUsRnCgPE$UAXaC96*(%3E(}2 zwjDd#p3o9(8&DB4pe7%Xy*mdOYgO+6L+r34t#oRjQzsQv_l9Wh39(wOB!24J{GE?U zh_*$la-eSK{}^RPMV$n&NDUGx1Qc^0FFX4wdL?I17?2GvDU6Q~C~T?`cqO4%5dAJ$ zVTB-NAM*Acf)m1f5CDp8MRP>(h5Wpz1AR15BA-OQCZ$!u{SpNfG&$vU+kS}`C7a0i z^I=<~;D&1fS+X?<&=FI8G1dNT>Go$!w?AtztgwEUHz1aPDq6wPdgvBUpldu}^^q+n z_QEpFRnM|DX?mV5nq`aUjYUJ(2r5#c@9JZH9CtT|URVvcSd**BX1 zMd4Us%sgt0?HkX(S@c!WMCrI`k-@pU@E-{4>+uqUOhwN$NN0%D9)Caj)`<`4tH#Uj z(ao3oWA#^r>6ffA&$MaVtgd>VvkaBrXA6F03zjy34@*TzUHF%@3Bq%%-q>ivcXM&$ zR`zaQg9V6tuj0lE_TDBf-B)0k6`j*bgX(manG~(l3BMDkQ_;GlaKP;qL}<)Nq^T2W z`6P&V7igW>8feSs$Xh8C3T>TYw7}s}P$L8JJ}@%&~@( VEyr5UvF9~EvxN{lwpgSf{~P+$9^?Q3 literal 0 HcmV?d00001 diff --git a/engine/example/test_db.py b/engine/example/test_db.py new file mode 100644 index 0000000..2d9f80b --- /dev/null +++ b/engine/example/test_db.py @@ -0,0 +1,44 @@ +""" +数据库模块测试示例。 + +运行方式(在项目根目录 trade/ 下): + python -m engine.example.test_db + +前提条件: + docker compose up -d + data 模块已同步过 K 线数据到 TimescaleDB +""" + +import asyncio +from datetime import datetime, timedelta, timezone + +from engine.config import print_config_summary +from engine.data import KlineRecord, KlineReader, close_pool + +async def main(): + print("=" * 60) + print(" Trade Engine — 数据库模块测试") + print("=" * 60) + + print_config_summary() + + reader = KlineReader() + + # 获取最近 100 根 BTCUSDT 5m K 线 + klines = await reader.get_klines_by_count( + symbol="BTCUSDT", + interval="1m", + count=5, + ) + print(f"\n查询到 {len(klines)} 根 K 线:") + for k in klines[:5]: + print(f" {k.time.isoformat()} | O={k.open:.2f} H={k.high:.2f} " + f"L={k.low:.2f} C={k.close:.2f} V={k.volume:.4f}") + if len(klines) > 5: + print(f" ... 剩余 {len(klines) - 5} 根") + + await close_pool() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/engine/pyproject.toml b/engine/pyproject.toml new file mode 100644 index 0000000..dd3fbca --- /dev/null +++ b/engine/pyproject.toml @@ -0,0 +1,14 @@ +[project] +name = "trade-engine" +version = "0.1.0" +description = "量化交易策略引擎" +requires-python = ">=3.10" +dependencies = [ + "asyncpg>=0.29", + "pydantic>=2.5", + "pyyaml>=6.0", +] + +[tool.pyright] +venvPath = "." +venv = ".venv"