From b5cdb4199336cb079ed0ee2b7c0989ce3a476870 Mon Sep 17 00:00:00 2001 From: Rekey Date: Fri, 12 Jun 2026 10:27:11 +0800 Subject: [PATCH] =?UTF-8?q?chore:=20=E6=9B=B4=E6=96=B0=20README=20?= =?UTF-8?q?=E6=9E=B6=E6=9E=84=E6=96=87=E6=A1=A3=E4=B8=8E=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=E6=B5=8B=E8=AF=95=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - README.md: 更新数据层 Node.js→Bun,common→engine/common,同步目录树结构 - db_test.py: TimescaleDB 数据库连接与基础查询测试脚本 --- README.md | 61 +++++++++---------- engine/db_test.py | 147 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 174 insertions(+), 34 deletions(-) create mode 100644 engine/db_test.py diff --git a/README.md b/README.md index 5403f3e..0054530 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ | 层 | 语言 | 职责 | |---|------|------| -| **数据层** | TypeScript (Node.js) | 行情采集、WebSocket 连接管理、K 线合成、数据写入 | +| **数据层** | TypeScript (Bun) | 行情采集、WebSocket 连接管理、K 线合成、数据写入 | | **业务层** | Python 3.10+ | 策略引擎、回测、风控、交易执行 | | **接口层** | TypeScript / Python | FastAPI (Python) 或 NestJS (TS) 提供 REST/WS API | @@ -45,7 +45,7 @@ | 分类 | 技术 / 库 | 说明 | | ------------ | ---------------------------------- | -------------------------------------- | -| **数据层语言** | **TypeScript 5.x (Node.js 20+)** | 行情采集、WebSocket 管理、数据管道 | +| **数据层语言** | **TypeScript 5.x (Bun)** | 行情采集、WebSocket 管理、数据管道 | | **业务层语言** | **Python 3.10+** | 策略引擎、回测、风控逻辑 | | **时序数据库** | **TimescaleDB (推荐)** | K 线数据存储,基于 PostgreSQL 扩展 | | 关系型数据库 | PostgreSQL 16+ | 订单、策略配置、用户数据等(TimescaleDB 基于 PG,可共用)| @@ -87,7 +87,7 @@ |------|------| | Node.js WebSocket 性能优异,天然适合高并发连接 | 需要维护两套技术栈 | | TypeScript 类型系统在数据管道中减少运行时错误 | 跨语言调试稍复杂 | -| **共享类型**:前后端可用同一套 TypeScript 类型定义 | 部署需要 Node.js + Python 双运行时 | +| **共享类型**:前后端可用同一套 TypeScript 类型定义 | 部署需要 Bun + Python 双运行时 | | 事件驱动模型与行情数据流天然匹配 | 团队需要双语言能力 | | npm 生态有成熟的交易 SDK(ccxt 等) | | @@ -940,7 +940,7 @@ export class TimescaleDBStorage { ##### 6. Python 读取实现(策略引擎侧) ```python -# common/storage.py +# engine/common/storage.py import asyncpg from datetime import datetime from typing import Optional @@ -1160,6 +1160,7 @@ volumes: | 子模块 | 职责 | 关键技术点 | | -------------- | ---------------------------------------------- | -------------------------------------- | +| **通用模块** | 策略基类、数据模型、日志、配置 | `engine/common/` 目录,基础类型定义 | | **策略管理器** | 策略注册、启动、停止、热加载 | 插件化架构、动态导入、`importlib` | | **信号分发器** | 将策略产生的交易信号分发到交易执行模块 | 事件总线、消息队列 | | **回测引擎** | 使用历史数据模拟策略执行,评估收益、回撤等指标 | `vectorbt` / `backtrader`、事件驱动 | @@ -1168,27 +1169,15 @@ volumes: #### 策略基类设计 ```python -class BaseStrategy(ABC): - """所有策略的基类""" +from common import BaseStrategy, StrategyConfig, Signal +from common.models import Kline, Ticker, OrderBook - def __init__(self, config: StrategyConfig): - self.config = config - self.position = 0.0 - self.pnl = 0.0 +class MyStrategy(BaseStrategy): + """策略示例 — 所有策略继承 BaseStrategy""" + strategy_type = "my_strategy" - @abstractmethod - async def on_ticker(self, ticker: Ticker) -> Signal | None: - """处理 ticker 数据,返回交易信号""" - ... - - @abstractmethod async def on_kline(self, kline: Kline) -> Signal | None: - """处理 K 线数据,返回交易信号""" ... - - async def on_orderbook(self, orderbook: OrderBook) -> Signal | None: - """处理深度数据(可选实现)""" - return None ``` --- @@ -1466,7 +1455,7 @@ async with redis.pubsub() as pubsub: | ---- | ------------------------------------------------------------ | ----------------------------------- | | 1.1 | 初始化 TypeScript 数据项目(`data/`),配置 `package.json` | 项目结构,ESLint + Prettier 配置 | | 1.2 | 初始化 Python 项目,配置 `poetry` / `uv` 依赖管理 | `pyproject.toml`,项目目录结构 | -| 1.3 | 定义共享类型:TypeScript `types/` + Python `common/models.py` | 双端对齐的数据模型 | +| 1.3 | 定义共享类型:TypeScript `types/` + Python `engine/common/models.py` | 双端对齐的数据模型 | | 1.4 | 实现 TS 交易所适配器基类 + Binance 适配器(`data/src/exchanges/`) | 统一接口 + WebSocket 连接 | | 1.5 | 实现 TS 行情采集器,WebSocket 订阅实时 ticker 和 K 线 | 实时行情流入 | | 1.6 | 实现 TS K 线合成器(`data/src/pipeline/kline-synthesizer.ts`) | 多周期 K 线实时合成 | @@ -1481,7 +1470,7 @@ async with redis.pubsub() as pubsub: | 步骤 | 任务 | 产出物 | | ---- | ------------------------------------------------------------ | -------------------------------- | -| 2.1 | 实现策略基类(`engine/base.py`) | `BaseStrategy` 抽象基类 | +| 2.1 | 实现策略基类(`engine/common/base.py`) | `BaseStrategy` 抽象基类 | | 2.2 | 实现策略管理器(`engine/manager.py`),支持策略注册和生命周期 | 策略热加载、启动/停止控制 | | 2.3 | 实现信号分发器(`engine/signals.py`) | 事件总线,策略到执行器的信号传递 | | 2.4 | 实现回测引擎(`engine/backtest.py`) | 历史数据回测,收益曲线、回撤等指标 | @@ -1590,11 +1579,16 @@ trade/ │ ├── engine/ # 🐍 Python 策略引擎 │ ├── __init__.py -│ ├── base.py # BaseStrategy 基类 -│ ├── manager.py # 策略管理器 -│ ├── signals.py # 信号分发器 -│ ├── backtest.py # 回测引擎 -│ └── optimizer.py # 参数优化器 +│ ├── env.yaml # 引擎环境配置 +│ ├── common/ # 引擎通用模块 +│ │ ├── __init__.py +│ │ ├── base.py # BaseStrategy 基类 +│ │ ├── models.py # 数据模型(Kline/Ticker/Trade/OrderBook) +│ │ └── logger.py # 结构化日志 +│ ├── manager.py # 策略管理器 +│ ├── signals.py # 信号分发器 +│ ├── backtest.py # 回测引擎 +│ └── optimizer.py # 参数优化器 │ ├── executor/ # 🐍 Python 交易执行模块 │ ├── __init__.py @@ -1632,12 +1626,11 @@ trade/ │ ├── grid_trading.py # 网格交易策略 │ └── arbitrage.py # 套利策略(可选) │ -├── common/ # 🐍 Python 公共工具模块 +├── common/ # 🐍 Python 公共基础设施(跨模块共享配置、工具等) │ ├── __init__.py -│ ├── logger.py # 日志配置 -│ ├── models.py # 数据模型(Pydantic,与 TS types 对应) -│ ├── constants.py # 常量定义 -│ └── utils.py # 工具函数 +│ ├── config.py # 全局配置 +│ ├── constants.py # 常量定义 +│ └── utils.py # 工具函数 │ ├── tests/ # 🐍 Python 测试 │ ├── __init__.py @@ -1662,7 +1655,7 @@ trade/ ``` ┌──────────────────┐ ┌──────────────┐ ┌─────────────┐ │ TS 数据模块 │ │ PostgreSQL │ │ Grafana │ -│ (Node.js 进程) │ │ (业务数据) │ │ (可视化) │ +│ (Bun 进程) │ │ (业务数据) │ │ (可视化) │ └──────┬───────────┘ └──────────────┘ └──────┬──────┘ │ │ │ ┌──────────────┐ │ diff --git a/engine/db_test.py b/engine/db_test.py new file mode 100644 index 0000000..a412e4a --- /dev/null +++ b/engine/db_test.py @@ -0,0 +1,147 @@ +""" +数据库 K 线读取测试 — 只读,从 TimescaleDB 读取各周期 K 线并打印 + +用法: + python db_test.py # 使用 env.yaml 中的 host + python db_test.py --host localhost # 覆盖 host(如 SSH 隧道后) +""" + +import asyncio +import sys + +import asyncpg + +from common.config import config as app_config + +# ── 各周期对应的表/视图 ── +INTERVAL_TABLES: dict[str, str] = { + "1m": "klines", + "5m": "klines_5m", + "15m": "klines_15m", + "30m": "klines_30m", + "1h": "klines_1h", + "4h": "klines_4h", + "1d": "klines_1d", + "1w": "klines_1w", +} + +LIMIT = 5 + + +def parse_args() -> str | None: + """解析命令行参数,返回 host 覆盖值""" + args = sys.argv[1:] + host_override = None + i = 0 + while i < len(args): + if args[i] == "--host" and i + 1 < len(args): + host_override = args[i + 1] + i += 2 + elif args[i].startswith("--host="): + host_override = args[i].split("=", 1)[1] + i += 1 + else: + i += 1 + return host_override + + +async def main(): + host_override = parse_args() + db = app_config.db + if host_override: + db = db.model_copy(update={"host": host_override}) + + dsn = f"postgresql://{db.user}:{db.password}@{db.host}:{db.port}/{db.name}" + + print(f"连接 {db.host}:{db.port}/{db.name} ...") + conn = await asyncpg.connect(dsn) + + try: + print() + print("=" * 85) + print(" TimescaleDB K 线数据读取测试(只读)") + print("=" * 85) + + for interval, table in INTERVAL_TABLES.items(): + # 检查表/视图是否存在(含 TimescaleDB 连续聚合视图) + exists = await conn.fetchval( + """ + SELECT EXISTS ( + SELECT 1 FROM pg_matviews WHERE matviewname = $1 + UNION + SELECT 1 FROM pg_tables WHERE tablename = $1 + UNION + SELECT 1 FROM pg_views WHERE viewname = $1 + ) + """, + table, + ) + + if not exists: + print(f"\n [{interval}] {table} — 不存在,跳过") + continue + + # 获取该表/视图的实际列名,避免查询不存在的列报错 + columns = await conn.fetch( + """ + SELECT column_name + FROM information_schema.columns + WHERE table_name = $1 + ORDER BY ordinal_position + """, + table, + ) + col_names = {r["column_name"] for r in columns} + + # 选择存在的列进行查询 + select_cols = ["time", "exchange", "symbol", "interval", "open", "high", "low", "close", "volume"] + if "trade_count" in col_names: + select_cols.append("trade_count") + if "is_closed" in col_names: + select_cols.append("is_closed") + + rows = await conn.fetch( + f""" + SELECT {', '.join(select_cols)} + FROM {table} + WHERE interval = $1 + ORDER BY time DESC + LIMIT $2 + """, + interval, + LIMIT, + ) + + print(f"\n{'─' * 85}") + print(f" [{interval}] {table} — {len(rows)} 条") + print(f"{'─' * 85}") + + if not rows: + print(" (无数据)") + continue + + for k in rows: + t = k["time"].strftime("%Y-%m-%d %H:%M:%S") + is_closed = k.get("is_closed") + if is_closed is not None: + mark = "✓" if is_closed else "◌" + else: + mark = " " # 聚合视图无此字段 + trade_count = k.get("trade_count", "-") + print( + f" {t} [{mark}] {k['symbol']:10s} {k['interval']:4s}" + f" O={k['open']:>12.4f} H={k['high']:>12.4f}" + f" L={k['low']:>12.4f} C={k['close']:>12.4f}" + f" V={k['volume']:>10.4f} trades={trade_count}" + ) + + print(f"\n{'=' * 85}") + print(" 读取完成") + print("=" * 85) + + finally: + await conn.close() + + +if __name__ == "__main__": + asyncio.run(main())