From 10e13ae8daaa057ef1a0df5192bd3ab7311ff3a5 Mon Sep 17 00:00:00 2001 From: Rekey Date: Sat, 6 Jun 2026 19:56:01 +0800 Subject: [PATCH] =?UTF-8?q?chore:=20=E5=88=9D=E5=A7=8B=E5=8C=96=E9=A1=B9?= =?UTF-8?q?=E7=9B=AE=E9=AA=A8=E6=9E=B6=20=E2=80=94=20=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=E4=BE=9D=E8=B5=96=E9=85=8D=E7=BD=AE=E3=80=81?= =?UTF-8?q?TimescaleDB=20=E5=BB=BA=E8=A1=A8=E8=84=9A=E6=9C=AC=E3=80=81Dock?= =?UTF-8?q?er=20Compose=20=E7=BC=96=E6=8E=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 5 + README.md | 1752 +++++++++++++++++++++++++++++++++++++ data/.env.example | 42 + data/init-db/001_init.sql | 228 +++++ data/package.json | 34 + data/tsconfig.json | 31 + docker-compose.yml | 28 + 7 files changed, 2120 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 data/.env.example create mode 100644 data/init-db/001_init.sql create mode 100644 data/package.json create mode 100644 data/tsconfig.json create mode 100644 docker-compose.yml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7f63181 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +node_modules/ +dist/ +.env +*.log +db/pgsql/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..5403f3e --- /dev/null +++ b/README.md @@ -0,0 +1,1752 @@ +# 数字货币量化交易系统开发计划 + +> 基于 Python + TypeScript 混合架构的数字货币量化交易系统 —— 核心模块划分与开发步骤 + +--- + +## 目录 + +1. [项目概述](#项目概述) +2. [技术栈选型](#技术栈选型) +3. [架构方案对比](#架构方案对比) +4. [核心模块划分](#核心模块划分) +5. [TypeScript 数据模块详解](#typescript-数据模块详解) +6. [开发步骤](#开发步骤) +7. [项目目录结构](#项目目录结构) +8. [部署与运维](#部署与运维) +9. [注意事项与风险提示](#注意事项与风险提示) + +--- + +## 项目概述 + +本项目的目标是构建一套**可扩展、低延迟、稳定可靠**的数字货币量化交易系统,支持: + +- 多交易所接入(Binance、OKX、Bybit 等) +- 多策略并行运行 +- 实时行情数据采集与存储 +- 策略回测与参数优化 +- 实盘交易执行与风控管理 +- 可视化监控与告警 + +### 架构策略 + +采用 **Python + TypeScript 混合架构**: + +| 层 | 语言 | 职责 | +|---|------|------| +| **数据层** | TypeScript (Node.js) | 行情采集、WebSocket 连接管理、K 线合成、数据写入 | +| **业务层** | Python 3.10+ | 策略引擎、回测、风控、交易执行 | +| **接口层** | TypeScript / Python | FastAPI (Python) 或 NestJS (TS) 提供 REST/WS API | + +--- + +## 技术栈选型 + +| 分类 | 技术 / 库 | 说明 | +| ------------ | ---------------------------------- | -------------------------------------- | +| **数据层语言** | **TypeScript 5.x (Node.js 20+)** | 行情采集、WebSocket 管理、数据管道 | +| **业务层语言** | **Python 3.10+** | 策略引擎、回测、风控逻辑 | +| **时序数据库** | **TimescaleDB (推荐)** | K 线数据存储,基于 PostgreSQL 扩展 | +| 关系型数据库 | PostgreSQL 16+ | 订单、策略配置、用户数据等(TimescaleDB 基于 PG,可共用)| +| 消息队列 | Redis / NATS | 跨语言数据流解耦,事件驱动 | +| 异步框架(TS) | `ws` + `axios` + `bull` | WebSocket 客户端、HTTP 请求、任务队列 | +| 异步框架(Py) | `asyncio` + `aiohttp` / `httpx` | 异步 I/O,高性能网络请求 | +| 数据分析 | `pandas` + `numpy` + `ta` / `talib` | 技术指标计算与数据分析 | +| 策略回测 | `backtrader` / `vectorbt` / 自研 | 高性能回测引擎 | +| Web 框架 | FastAPI + Uvicorn | REST API 与 WebSocket 服务 | +| 可视化 | Grafana + Streamlit | 监控仪表盘与交互式分析 | +| 任务调度 | Celery / APScheduler / Bull | 定时任务与异步任务队列 | +| 配置管理 | pydantic-settings + `.env` | 类型安全的配置管理 | +| 日志监控 | loguru + winston + sentry | 结构化日志与异常追踪 | +| **类型校验** | **Zod (TS) / Pydantic (Py)** | 运行时数据校验,双端类型一致性保证 | + +--- + +## 架构方案对比 + +### 方案 A:纯 Python(原方案) + +``` +[Python] 数据采集 → 策略引擎 → 交易执行 → 风控 +``` + +| 优势 | 劣势 | +|------|------| +| 单一语言,维护简单 | WebSocket 大规模连接管理不如 Node.js | +| 数据分析生态无敌 | Python GIL 在 CPU 密集型场景受限 | +| 量化金融社区成熟 | 异步生态相对 Node.js 较新 | + +### 方案 B:TypeScript 数据模块 + Python 业务引擎 ✅ **(推荐)** + +``` +[TypeScript] 数据采集 → Redis/消息队列 → [Python] 策略引擎 → 交易执行 +``` + +| 优势 | 劣势 | +|------|------| +| Node.js WebSocket 性能优异,天然适合高并发连接 | 需要维护两套技术栈 | +| TypeScript 类型系统在数据管道中减少运行时错误 | 跨语言调试稍复杂 | +| **共享类型**:前后端可用同一套 TypeScript 类型定义 | 部署需要 Node.js + Python 双运行时 | +| 事件驱动模型与行情数据流天然匹配 | 团队需要双语言能力 | +| npm 生态有成熟的交易 SDK(ccxt 等) | | + +### 方案 C:全 TypeScript + +``` +[TypeScript] 数据采集 → 策略引擎 → 交易执行 → 风控 +``` + +| 优势 | 劣势 | +|------|------| +| 单一语言 | 回测/量化分析生态远不如 Python | +| 类型安全 | 缺乏 pandas/numpy 级别的数据处理库 | +| 全栈一致性(前后端 + 数据层共用 TS) | 量化社区资源少,需要自研大量基础设施 | + +--- + +## 核心模块划分 + +系统划分为 **7 大核心模块**,模块间通过消息队列或 API 解耦,整体架构如下: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Web UI / Dashboard │ +│ (FastAPI / Streamlit / Grafana) │ +└──────────────┬──────────────────┬───────────────────────────┘ + │ │ + ┌──────────▼──────────┐ ┌───▼────────────────────┐ + │ API Gateway │ │ 监控告警模块 │ + │ (REST + WebSocket) │ │ (Prometheus + Alert) │ + └──────────┬──────────┘ └────────────────────────┘ + │ + ┌──────────▼──────────────────────────────────────────┐ + │ 策略引擎 🐍 Python │ + │ (策略加载 / 生命周期管理 / 信号分发) │ + └────┬──────────────┬──────────────┬─────────────────┘ + │ │ │ + ┌────▼────┐ ┌─────▼─────┐ ┌───▼──────────┐ + │ 策略 A │ │ 策略 B │ │ 策略 C ... │ + │(趋势) │ │ (网格) │ │ │ + └────┬────┘ └─────┬─────┘ └───┬──────────┘ + │ │ │ + ┌────▼──────────────▼──────────────▼─────────────────┐ + │ 交易执行模块 🐍 Python │ + │ (订单管理 / 仓位管理 / 交易所适配器 / 重试机制) │ + └────────────────────┬───────────────────────────────┘ + │ + ┌────────────────────▼───────────────────────────────┐ + │ 消息队列 (Redis / NATS) │ + │ ┌───────────── 数据管道 ─────────────┐ │ + └────────────────────┬───────────────────────────────┘ + │ + ┌────────────────────▼───────────────────────────────┐ + │ 数据模块 🟦 TypeScript │ + │ (行情采集 / K线合成 / 数据存储 / 数据清洗 / 因子计算) │ + └────────────────────┬───────────────────────────────┘ + │ + ┌────────────────────▼───────────────────────────────┐ + │ 风控模块 🐍 Python │ + │ (资金管理 / 仓位限制 / 最大回撤 / 异常交易熔断 / 白名单)│ + └─────────────────────────────────────────────────────┘ +``` + +--- + +### 1. 数据模块 (`data/`) — 🟦 TypeScript + +负责从交易所获取和处理市场数据,是一切策略和决策的基础。 + +#### 为什么数据模块用 TypeScript? + +| 原因 | 说明 | +|------|------| +| **WebSocket 性能** | Node.js 事件循环天然适合处理大量 WebSocket 连接(单进程万级连接) | +| **内存管理** | V8 引擎的垃圾回收机制在短生命周期对象场景表现优异 | +| **类型安全** | TypeScript 编译期检查确保数据管道中的类型正确性 | +| **流式处理** | Node.js Stream / Observable 模式与行情数据流完美匹配 | +| **生态优势** | 各交易所官方/社区 SDK 对 Node.js 支持最好 | + +#### 子模块划分 + +| 子模块 | 职责 | 关键技术点 | +| ---------------- | -------------------------------------------------------------------- | ----------------------------------------------- | +| **行情采集器** | 通过 WebSocket 订阅实时行情(ticker、orderbook、trade),定时拉取K线 | `ws`、断线重连、心跳检测、连接池管理 | +| **K 线合成器** | 将实时 tick/trade 流合成为 OHLCV K 线,支持多周期并行、补齐、异常处理 | 时间桶算法、增量更新、多级合成(1m→5m→15m→1h) | +| **数据存储** | K 线数据持久化到 TimescaleDB,订单数据存入 PostgreSQL | TimescaleDB hypertable、批量 UPSERT、列式压缩 | +| **数据清洗** | 去除异常 tick、填充缺失值、处理停盘数据 | 统计异常检测、插值法 | +| **数据发布** | 将处理后的数据通过消息队列推送给 Python 策略引擎 | Redis Pub/Sub、NATS、协议缓冲(Protocol Buffers) | + +#### 关键接口 + +```typescript +// src/data/collector.ts +export interface DataFeed { + subscribeTicker(symbols: string[]): Promise; + subscribeOrderbook(symbol: string, depth: number): Promise; + fetchKlines(symbol: string, interval: KlineInterval, limit?: number): Promise; +} + +// src/data/types.ts +export interface Ticker { + exchange: string; + symbol: string; + price: number; + volume: number; + timestamp: number; +} + +export interface Kline { + exchange: string; + symbol: string; + interval: KlineInterval; + open: number; + high: number; + low: number; + close: number; + volume: number; + timestamp: number; +} + +// src/data/pipeline.ts +export class DataPipeline { + constructor( + private collector: DataFeed, + private klineSynthesizer: KlineSynthesizer, + private storage: DataStorage, + private publisher: DataPublisher, + ) {} + + async start(): Promise { + // 启动数据流管道 + const tickerStream = await this.collector.subscribeTicker(['BTCUSDT', 'ETHUSDT']); + tickerStream + .pipe(this.klineSynthesizer.transform()) + .pipe(this.storage.writeStream()) + .pipe(this.publisher.publishStream()); + } +} +``` + +--- + +#### K 线合成器详解 + +K 线合成器是数据模块中最核心的组件之一,负责将交易所推送的 **实时离散数据** 合成为标准的 **OHLCV K 线**。以下是其完整的工作机制。 + +##### 1. 输入数据源 + +合成器接收三种类型的原始数据: + +| 数据源 | 交易所推送频率 | 数据量 | 用途 | +|--------|---------------|--------|------| +| **Ticker**(最新成交价) | 100ms~1s/次 | 中等 | 实时价格监控,快速 K 线更新 | +| **Trade**(逐笔成交) | 毫秒级 | 极大 | 精确 K 线合成(推荐) | +| **K 线(原始)** | 1s~1min(交易所合成好的) | 小 | 直接使用,无需合成(最快方案) | + +> **推荐方案**:使用 `Trade`(逐笔成交)作为主数据源合成 K 线,精度最高;`Ticker` 作为辅助用于快速预览。 + +##### 2. 核心算法:时间桶(Time Bucket) + +``` +时间轴(1分钟 K 线为例): + + 时间桶 [t0, t0+1min) + ┌─────────────────────────────────────────┐ + │ Trade A Trade B Trade C │ + │ p: 50000 p: 50020 p: 50010 │ + │ v: 0.5 v: 1.2 v: 0.8 │ + │ t: 00:10 t: 00:25 t: 00:45 │ + │ │ + │ OHLCV = { O: 50000, H: 50020, │ + │ L: 50000, C: 50010, │ + │ V: 2.5 } │ + └─────────────────────────────────────────┘ +``` + +**算法步骤**: + +``` +每收到一条 Trade/Ticker: + 1. 计算时间桶索引:bucketIndex = floor(timestamp / interval_ms) + + 2. 如果 bucketIndex != currentBucketIndex: + a. 关闭当前桶,emit K 线 + b. 创建新桶,初始化 O=H=L=C=当前价格, V=当前成交量 + + 3. 否则(仍在当前桶内): + a. O = 不变(桶内第一条的价格) + b. H = max(H, 当前价格) + c. L = min(L, 当前价格) + d. C = 当前价格 + e. V += 当前成交量 +``` + +##### 3. 多周期并行合成 + +系统需要同时维护多个周期的 K 线(1m、5m、15m、1h...),有两种实现策略: + +**策略 A:独立合成(每个周期独立计算)** ✅ 推荐 + +``` +Trade 流 ──► 1m 合成器 ──► 1m K 线 + ├──► 5m 合成器 ──► 5m K 线 + ├──► 15m 合成器 ─► 15m K 线 + └──► 1h 合成器 ──► 1h K 线 +``` + +- 每个周期维护独立的时间桶 +- 优点:实现简单,各周期互不影响 +- 缺点:内存占用随周期数量线性增长 + +**策略 B:多级合成(从低周期聚合为高周期)** + +``` +Trade 流 ──► 1m 合成器 ──► 5m 合成器 ──► 15m 合成器 ──► 1h 合成器 + ↑ 由 5 根 1m 合成 ↑ 由 3 根 5m 合成 +``` + +- 高周期 K 线由低周期 K 线聚合而成 +- 优点:内存效率高,无需为每个周期维护独立的 Trade 缓存 +- 缺点:低周期 K 线未完成时,高周期无法产生完整 K 线 + +**混合策略(推荐)**: + +``` +Trade 流 ──► 1m 合成器(实时,数据源为 Trade) + │ + ▼ + 1m K 线 ──► 5m 聚合器(由 5 根 1m 聚合) + │ + ▼ + 5m K 线 ──► 15m 聚合器 + │ + ▼ + 15m K 线 ──► 1h 聚合器 +``` + +1m 用 Trade 实时合成(精度最高),5m 及以上从 1m 聚合(计算量最小)。 + +##### 4. K 线补齐与边界处理 + +这是合成器最容易被忽视但极其重要的部分: + +| 场景 | 问题 | 处理方案 | +|------|------|----------| +| **无成交周期** | 某个 1m 区间内没有任何 Trade | 使用前一根 K 线的 Close 作为当前 K 线的 OHLC(平盘),Volume=0 | +| **交易所暂停** | 交易所临时停盘或维护 | 停止 emit 新 K 线,标记为 `gap: true`,策略端跳过 | +| **周期边界错位** | 交易所 1h K 线从整点开始,本地合成可能有时区偏移 | 使用 UTC 时间统一对齐,`bucketIndex = floor(utcTimestamp / intervalMs) * intervalMs` | +| **首根 K 线不完整** | 系统启动时间不在周期起点,第一根 K 线数据不足 | emit 时标记 `isPartial: true`,策略端可选择忽略或使用 | +| **数据延迟/乱序** | Trade 到达顺序与发生顺序不一致 | 缓冲区保留 100ms 的排序窗口,按 timestamp 排序后处理 | + +##### 5. 增量更新 vs 完整替换 + +```typescript +// 增量更新模式(推荐)—— 只推送变化部分,大幅减少数据量 +interface KlineDelta { + exchange: string; + symbol: string; + interval: KlineInterval; + bucketTime: number; // 时间桶起始时间戳(毫秒) + o: number; // Open(只在开桶时更新) + h: number; // High + l: number; // Low + c: number; // Close + v: number; // Volume + isClosed: boolean; // 当前桶是否已关闭(不再变化) + tradeCount: number; // 桶内的成交笔数 +} + +// 完整 K 线(用于回测和历史数据) +interface Kline { + exchange: string; + symbol: string; + interval: KlineInterval; + openTime: number; + closeTime: number; + open: number; + high: number; + low: number; + close: number; + volume: number; + quoteVolume: number; // 成交额 + takerBuyBaseVolume: number; // 主动买入量 + takerBuyQuoteVolume: number; // 主动买入额 + tradeCount: number; // 成交笔数 +} +``` + +##### 6. 完整实现示例 + +```typescript +// src/data/pipeline/kline-synthesizer.ts +import { Subject, interval } from 'rxjs'; +import { bufferTime, filter, map } from 'rxjs/operators'; + +type KlineInterval = '1m' | '5m' | '15m' | '30m' | '1h' | '4h' | '1d'; + +const INTERVAL_MS: Record = { + '1m': 60_000, + '5m': 300_000, + '15m': 900_000, + '30m': 1_800_000, + '1h': 3_600_000, + '4h': 14_400_000, + '1d': 86_400_000, +}; + +interface Trade { + price: number; + amount: number; + timestamp: number; +} + +interface KlineBucket { + openTime: number; + open: number; + high: number; + low: number; + close: number; + volume: number; + tradeCount: number; + isClosed: boolean; +} + +export class KlineSynthesizer { + private buckets = new Map(); + + /** + * 核心合成方法:输入一条 Trade,输出(可能)闭合的 K 线 + */ + synthesize( + symbol: string, + interval: KlineInterval, + trade: Trade, + ): KlineBucket | null { + const intervalMs = INTERVAL_MS[interval]; + const bucketTime = + Math.floor(trade.timestamp / intervalMs) * intervalMs; + const key = `${symbol}:${interval}:${bucketTime}`; + + let bucket = this.buckets.get(key); + + if (!bucket) { + // 新时间桶:初始化 + bucket = { + openTime: bucketTime, + open: trade.price, + high: trade.price, + low: trade.price, + close: trade.price, + volume: trade.amount, + tradeCount: 1, + isClosed: false, + }; + this.buckets.set(key, bucket); + return null; // 新桶第一条,暂不 emit + } + + // 更新当前桶 + bucket.high = Math.max(bucket.high, trade.price); + bucket.low = Math.min(bucket.low, trade.price); + bucket.close = trade.price; + bucket.volume += trade.amount; + bucket.tradeCount += 1; + + return null; // 桶未关闭 + } + + /** + * 关闭过期桶(由定时器调用),返回所有已关闭的 K 线 + */ + closeExpiredBuckets(now: number): KlineBucket[] { + const closed: KlineBucket[] = []; + for (const [key, bucket] of this.buckets.entries()) { + if (!bucket.isClosed && now >= bucket.openTime + INTERVAL_MS['1m']) { + bucket.isClosed = true; + closed.push(bucket); + this.buckets.delete(key); + } + } + return closed; + } + + /** + * RxJS 操作符:将 Trade 流转换为 K 线流 + */ + transform(interval: KlineInterval) { + return (source: Subject) => { + const output = new Subject(); + + // 每笔 Trade 更新桶 + source.subscribe((trade) => { + this.synthesize('BTCUSDT', interval, trade); + }); + + // 定时检查过期桶(每秒一次) + interval(1000).subscribe(() => { + const closed = this.closeExpiredBuckets(Date.now()); + closed.forEach((k) => output.next(k)); + }); + + return output; + }; + } +} +``` + +##### 7. 性能考量 + +| 指标 | 数据量 | 说明 | +|------|--------|------| +| **单币种 Trade/秒** | ~50~200 tps | BTCUSDT 高峰期 | +| **单币种 Ticker/秒** | ~10 tps | Binance WebSocket 推送频率 | +| **内存开销/币种** | ~1~5 MB | 100 个并行时间桶 | +| **单笔合成耗时** | < 1 μs | Node.js V8 优化后 | +| **10 币种 5 周期并行** | ~500 μs/轮 | 全部更新一次 | + +> **优化建议**:对于高频币种(BTC/ETH),可以采用 Trade 合成 + 1m K 线直接复用交易所推送的原始 K 线(无需自行合成)。 + +##### 8. 与交易所 K 线的差异处理 + +| 差异项 | 交易所推送 K 线 | 本地合成 K 线 | +|--------|---------------|-------------| +| **延迟** | 通常延迟 1~5 秒 | 实时(Trade 到达即更新) | +| **精度** | 包含所有成交数据 | 取决于 WebSocket 推送的 Trade 覆盖率 | +| **完整性** | 最终数据,不可变 | 可能因断线导致缺失(需补齐) | +| **自定义周期** | 仅支持标准周期 | 支持任意周期(如 3m、7m、自定义) | +| **多交易所对齐** | 各交易所时间不同 | 使用 UTC 统一对齐 | + +> **最佳实践**:本地合成 K 线用于 **实时策略决策**,交易所 K 线用于 **回测和历史分析**。两者结合使用。 + +--- + +#### TimescaleDB 数据存储方案 + +K 线数据具有典型的时序特征:**持续写入、按时间范围查询、少有更新、数据量大**。TimescaleDB 作为 PostgreSQL 的时序扩展,相比 InfluxDB 有以下核心优势: + +| 对比维度 | TimescaleDB | InfluxDB | +|---------|------------|----------| +| **基础数据库** | PostgreSQL 扩展 | 独立时序数据库 | +| **SQL 兼容** | 完整 SQL 支持(JOIN、子查询、窗口函数) | 类 SQL 但不完全兼容 | +| **与业务数据关联** | K 线表和订单表可以在同一个 PG 实例中 JOIN | 需要跨数据库查询 | +| **数据压缩** | 列式压缩,压缩比 90%+ | 压缩比约 85% | +| **连续聚合** | 内置自动刷新物化视图 | 需要外部工具 | +| **保留策略** | 内置自动删除(数据保留策略) | 内置 | +| **生态集成** | 可与 PostGIS、pgvector 等 PG 扩展共存 | 独立生态 | +| **运维成本** | 复用 PG 运维经验 | 需独立运维 | + +##### 1. 表结构设计 + +```sql +-- 创建 TimescaleDB 扩展 +CREATE EXTENSION IF NOT EXISTS timescaledb; +CREATE EXTENSION IF NOT EXISTS timescaledb_toolkit; + +-- ============================================ +-- 1. K 线主表(hypertable) +-- ============================================ +CREATE TABLE klines ( + time TIMESTAMPTZ NOT NULL, -- K 线开盘时间 + exchange TEXT NOT NULL, -- 交易所 (binance/okx/bybit) + symbol TEXT NOT NULL, -- 交易对 (BTCUSDT/ETHUSDT) + interval TEXT NOT NULL, -- 周期 (1m/5m/15m/1h/4h/1d) + + -- OHLCV + open NUMERIC(20,8) NOT NULL, + high NUMERIC(20,8) NOT NULL, + low NUMERIC(20,8) NOT NULL, + close NUMERIC(20,8) NOT NULL, + volume NUMERIC(20,8) NOT NULL, + + -- 扩展字段 + quote_volume NUMERIC(20,8), -- 成交额(计价币种) + taker_buy_base_vol NUMERIC(20,8), -- 主动买入成交量 + taker_buy_quote_vol NUMERIC(20,8), -- 主动买入成交额 + trade_count INTEGER, -- 成交笔数 + is_closed BOOLEAN DEFAULT TRUE, -- K 线是否已关闭 + + -- 元数据 + created_at TIMESTAMPTZ DEFAULT NOW(), -- 记录创建时间 + updated_at TIMESTAMPTZ DEFAULT NOW(), -- 最后更新时间 + + -- 分区键 + -- time 是分区列,symbol + interval 是分布列 + UNIQUE (time, exchange, symbol, interval) +); + +-- 转换为 hypertable(按时间和空间分区) +SELECT create_hypertable( + 'klines', + 'time', -- 时间分区列 + chunk_time_interval => INTERVAL '1 day', -- 每分区 1 天数据 + partitioning_column => 'exchange', -- 空间分区列 + number_partitions => 4, -- 4 个空间分区 + if_not_exists => TRUE +); + +-- ============================================ +-- 2. 索引设计 +-- ============================================ + +-- 查询最频繁:按交易对+周期+时间范围查 +CREATE INDEX idx_klines_lookup + ON klines (exchange, symbol, interval, time DESC); + +-- 回测查询:按交易对+周期+时间范围 +CREATE INDEX idx_klines_backtest + ON klines (symbol, interval, time ASC); + +-- 最新 K 线查询 +CREATE INDEX idx_klines_latest + ON klines (exchange, symbol, interval, time DESC) + WHERE is_closed = TRUE; + +-- ============================================ +-- 3. 数据压缩策略 +-- ============================================ + +-- 启用压缩(已关闭的 K 线自动压缩) +ALTER TABLE klines SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'exchange, symbol, interval', + timescaledb.compress_orderby = 'time DESC' +); + +-- 自动压缩策略:K 线关闭 7 天后压缩 +SELECT add_compression_policy('klines', INTERVAL '7 days', if_not_exists => TRUE); + +-- ============================================ +-- 4. 数据保留策略 +-- ============================================ + +-- 1m K 线保留 30 天 +SELECT add_retention_policy('klines', INTERVAL '30 days', if_not_exists => TRUE); +``` + +##### 2. 连续聚合(Continuous Aggregates) + +连续聚合是 TimescaleDB 最强大的功能之一 —— 自动从低周期 K 线聚合为高周期 K 线,无需手动维护。 + +```sql +-- ============================================ +-- 5m K 线(从 1m 自动聚合) +-- ============================================ +CREATE MATERIALIZED VIEW klines_5m +WITH (timescaledb.continuous) AS +SELECT + time_bucket('5 minutes', time) AS time, + exchange, + symbol, + '5m'::TEXT AS interval, + FIRST(open, time) AS open, + MAX(high) AS high, + MIN(low) AS low, + LAST(close, time) AS close, + SUM(volume) AS volume, + SUM(quote_volume) AS quote_volume, + SUM(taker_buy_base_vol) AS taker_buy_base_vol, + SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, + SUM(trade_count) AS trade_count +FROM klines +WHERE interval = '1m' +GROUP BY time_bucket('5 minutes', time), exchange, symbol; + +-- 自动刷新策略(每 1 分钟刷新最近 10 分钟数据) +SELECT add_continuous_aggregate_policy('klines_5m', + start_offset => INTERVAL '1 day', + end_offset => INTERVAL '10 minutes', -- 留 10 分钟给延迟数据 + schedule_interval => INTERVAL '1 minute', + if_not_exists => TRUE +); + +-- ============================================ +-- 15m K 线 +-- ============================================ +CREATE MATERIALIZED VIEW klines_15m +WITH (timescaledb.continuous) AS +SELECT + time_bucket('15 minutes', time) AS time, + exchange, symbol, '15m'::TEXT AS interval, + FIRST(open, time), MAX(high), MIN(low), LAST(close, time), + SUM(volume), SUM(quote_volume), + SUM(taker_buy_base_vol), SUM(taker_buy_quote_vol), + SUM(trade_count) +FROM klines WHERE interval = '1m' +GROUP BY time_bucket('15 minutes', time), exchange, symbol; + +SELECT add_continuous_aggregate_policy('klines_15m', + start_offset => INTERVAL '2 days', + end_offset => INTERVAL '30 minutes', + schedule_interval => INTERVAL '5 minutes', + if_not_exists => TRUE +); + +-- ============================================ +-- 1h K 线 +-- ============================================ +CREATE MATERIALIZED VIEW klines_1h +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 hour', time) AS time, + exchange, symbol, '1h'::TEXT AS interval, + FIRST(open, time), MAX(high), MIN(low), LAST(close, time), + SUM(volume), SUM(quote_volume), + SUM(taker_buy_base_vol), SUM(taker_buy_quote_vol), + SUM(trade_count) +FROM klines WHERE interval = '1m' +GROUP BY time_bucket('1 hour', time), exchange, symbol; + +SELECT add_continuous_aggregate_policy('klines_1h', + start_offset => INTERVAL '3 days', + end_offset => INTERVAL '1 hour', + schedule_interval => INTERVAL '5 minutes', + if_not_exists => TRUE +); + +-- ============================================ +-- 1d K 线(含周线、月线可在查询时用 time_bucket) +-- ============================================ +CREATE MATERIALIZED VIEW klines_1d +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 day', time) AS time, + exchange, symbol, '1d'::TEXT AS interval, + FIRST(open, time), MAX(high), MIN(low), LAST(close, time), + SUM(volume), SUM(quote_volume), + SUM(taker_buy_base_vol), SUM(taker_buy_quote_vol), + SUM(trade_count) +FROM klines WHERE interval = '1m' +GROUP BY time_bucket('1 day', time), exchange, symbol; + +SELECT add_continuous_aggregate_policy('klines_1d', + start_offset => INTERVAL '7 days', + end_offset => INTERVAL '2 hours', + schedule_interval => INTERVAL '1 hour', + if_not_exists => TRUE +); +``` + +**查询示例**: + +```sql +-- 查询 BTC 最近 100 根 1h K 线(自动走连续聚合) +SELECT * FROM klines_1h +WHERE symbol = 'BTCUSDT' + AND exchange = 'binance' +ORDER BY time DESC +LIMIT 100; + +-- 查询 BTC 1h K 线的周线 +SELECT + time_bucket('1 week', time) AS week, + FIRST(open, time), MAX(high), MIN(low), LAST(close, time), + SUM(volume) +FROM klines_1h +WHERE symbol = 'BTCUSDT' +GROUP BY week +ORDER BY week DESC; + +-- 统计最近 7 天各币种波动率 +SELECT + symbol, + MAX(high) / MIN(low) - 1 AS volatility +FROM klines_1h +WHERE time > NOW() - INTERVAL '7 days' +GROUP BY symbol +ORDER BY volatility DESC; +``` + +##### 3. 空间与时间分区原理 + +``` +hypertable: klines +┌─────────────────────────────────────────────────────┐ +│ 时间线 │ +│ │ +│ chunk_1 (2024-01-01) chunk_2 (2024-01-02) ... │ +│ ┌──────────────────┐ ┌──────────────────┐ │ +│ │ exchange=A │ │ exchange=A │ │ +│ │ exchange=B │ │ exchange=B │ │ +│ │ exchange=C │ │ exchange=C │ │ +│ │ exchange=D │ │ exchange=D │ │ +│ └──────────────────┘ └──────────────────┘ │ +│ │ +│ 每个 chunk 内部按 exchange 做空间分区 │ +│ chunk 大小 ≈ 1 天数据(自动调整) │ +└─────────────────────────────────────────────────────┘ +``` + +| 参数 | 值 | 说明 | +|------|----|------| +| `chunk_time_interval` | `1 day` | 每个 chunk 存储 1 天数据,约 100~500MB | +| `number_partitions` | 4 | 按交易所分区,4 个空间分区 | +| 空间分区列 | `exchange` | Binance/OKX/Bybit 等分布在不同分区 | + +##### 4. TypeScript 写入实现 + +```typescript +// data/src/storage/timescaledb.ts +import { Pool } from 'pg'; + +interface KlineRecord { + time: Date; + exchange: string; + symbol: string; + interval: string; + open: number; + high: number; + low: number; + close: number; + volume: number; + quoteVolume: number; + takerBuyBaseVol: number; + takerBuyQuoteVol: number; + tradeCount: number; + isClosed: boolean; +} + +export class TimescaleDBStorage { + private pool: Pool; + private batchBuffer: KlineRecord[] = []; + private batchSize: number; + private flushInterval: number; + + constructor(config: { + connectionString: string; + batchSize?: number; // 批量写入条数,默认 500 + flushIntervalMs?: number; // 最大等待时间,默认 1000ms + }) { + this.pool = new Pool({ connectionString: config.connectionString }); + this.batchSize = config.batchSize ?? 500; + this.flushInterval = config.flushIntervalMs ?? 1000; + + // 定时刷新缓冲区 + setInterval(() => this.flush(), this.flushInterval); + } + + /** + * 写入单条 K 线(加入缓冲区,批量写入) + */ + async write(kline: KlineRecord): Promise { + this.batchBuffer.push(kline); + + if (this.batchBuffer.length >= this.batchSize) { + await this.flush(); + } + } + + /** + * 批量刷新缓冲区 + */ + async flush(): Promise { + if (this.batchBuffer.length === 0) return; + + const batch = this.batchBuffer.splice(0, this.batchSize); + const client = await this.pool.connect(); + + try { + // 使用 UNLOGGED 表写入时跳过 WAL 日志(提升写入性能) + await client.query('SET LOCAL timescaledb.enable_skip_scan = ON'); + + // 批量 UPSERT:已存在的 K 线只更新 close/high/low(增量更新) + const values = batch.map((k, i) => { + const offset = i * 14; + return `($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, + $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, + $${offset + 9}, $${offset + 10}, $${offset + 11}, $${offset + 12}, + $${offset + 13}, $${offset + 14})`; + }).join(','); + + const params = batch.flatMap(k => [ + k.time, k.exchange, k.symbol, k.interval, + k.open, k.high, k.low, k.close, + k.volume, k.quoteVolume, + k.takerBuyBaseVol, k.takerBuyQuoteVol, + k.tradeCount, k.isClosed, + ]); + + await client.query(` + INSERT INTO klines ( + time, exchange, symbol, interval, + open, high, low, close, + volume, quote_volume, + taker_buy_base_vol, taker_buy_quote_vol, + trade_count, is_closed + ) VALUES ${values} + ON CONFLICT (time, exchange, symbol, interval) DO UPDATE SET + high = GREATEST(klines.high, EXCLUDED.high), + low = LEAST(klines.low, EXCLUDED.low), + close = EXCLUDED.close, + volume = klines.volume + EXCLUDED.volume, + trade_count = klines.trade_count + EXCLUDED.trade_count, + is_closed = EXCLUDED.is_closed, + updated_at = NOW() + `, params); + } finally { + client.release(); + } + } + + /** + * 查询 K 线 + */ + async query(options: { + exchange: string; + symbol: string; + interval: string; + startTime: Date; + endTime: Date; + limit?: number; + }): Promise { + const result = await this.pool.query(` + SELECT * FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + AND time >= $4 + AND time <= $5 + ORDER BY time ASC + LIMIT $6 + `, [ + options.exchange, + options.symbol, + options.interval, + options.startTime, + options.endTime, + options.limit ?? 1000, + ]); + return result.rows; + } + + async close(): Promise { + await this.flush(); + await this.pool.end(); + } +} +``` + +##### 5. 写入性能优化策略 + +| 策略 | 说明 | 效果 | +|------|------|------| +| **批量写入** | 每 500 条或 1 秒批量写入一次 | 减少 99% 的数据库连接开销 | +| **UPSERT** | 用 `ON CONFLICT DO UPDATE` 处理同一 K 线的增量更新 | 避免重复写入 | +| **连接池** | `pg.Pool` 管理 10~20 个连接 | 控制并发,避免 PG 连接暴涨 | +| **压缩** | 关闭 7 天后自动压缩 | 存储空间减少 90%+ | +| **chunk 对齐** | chunk 按天分区,避免跨分区查询 | 查询性能提升 10x | +| **索引** | 精确的复合索引覆盖所有查询模式 | 避免全表扫描 | + +##### 6. Python 读取实现(策略引擎侧) + +```python +# common/storage.py +import asyncpg +from datetime import datetime +from typing import Optional + +class TimescaleDBReader: + """Python 策略引擎侧的 K 线读取器""" + + def __init__(self, dsn: str): + self.dsn = dsn + self.pool: Optional[asyncpg.Pool] = None + + async def connect(self): + self.pool = await asyncpg.create_pool( + self.dsn, + min_size=5, + max_size=20, + command_timeout=10, + ) + + async def get_klines( + self, + symbol: str, + interval: str, + start_time: datetime, + end_time: datetime, + exchange: str = "binance", + limit: int = 1000, + ) -> list[dict]: + """ + 获取 K 线数据(自动路由到连续聚合视图) + + 当 interval >= '1h' 时,自动使用连续聚合视图查询, + 性能远优于扫描原始 1m 表。 + """ + # 自动路由到对应的物化视图 + view_map = { + "5m": "klines_5m", "15m": "klines_15m", + "30m": "klines_30m", "1h": "klines_1h", + "4h": "klines_4h", "1d": "klines_1d", + } + table = view_map.get(interval, "klines") + + async with self.pool.acquire() as conn: + rows = await conn.fetch( + f""" + SELECT + time, + open, + high, + low, + close, + volume, + quote_volume, + trade_count + FROM {table} + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + AND time >= $4 + AND time <= $5 + ORDER BY time ASC + LIMIT $6 + """, + exchange, symbol, interval, + start_time, end_time, limit, + ) + return [dict(row) for row in rows] + + async def get_latest_kline( + self, + symbol: str, + interval: str, + exchange: str = "binance", + ) -> Optional[dict]: + """获取最新一根 K 线""" + async with self.pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT * FROM klines + WHERE exchange = $1 + AND symbol = $2 + AND interval = $3 + AND is_closed = TRUE + ORDER BY time DESC + LIMIT 1 + """, + exchange, symbol, interval, + ) + return dict(row) if row else None + + async def get_ohlcv_batch( + self, + symbol: str, + interval: str, + limit: int = 200, + exchange: str = "binance", + ) -> list[tuple]: + """ + 获取 OHLCV 数组(直接用于 pandas DataFrame 或 TA-Lib 计算) + + 返回格式: [(timestamp, open, high, low, close, volume), ...] + """ + view_map = { + "5m": "klines_5m", "15m": "klines_15m", + "1h": "klines_1h", "4h": "klines_4h", "1d": "klines_1d", + } + table = view_map.get(interval, "klines") + + async with self.pool.acquire() as conn: + rows = await conn.fetch( + f""" + SELECT time, open, high, low, close, volume + FROM {table} + WHERE exchange = $1 AND symbol = $2 AND interval = $3 + ORDER BY time DESC + LIMIT $4 + """, + exchange, symbol, interval, limit, + ) + # 正序返回 + return [ + (row['time'], row['open'], row['high'], + row['low'], row['close'], row['volume']) + for row in reversed(rows) + ] + + async def close(self): + if self.pool: + await self.pool.close() +``` + +##### 7. 磁盘空间估算 + +```sql +-- 单币种单交易所 1m K 线行数 +-- 1天 = 1440 行,1月 ≈ 43200 行,1年 ≈ 525600 行 + +-- 无压缩时: +-- 1 行 ≈ 180 bytes +-- 10 币种 × 1 年 × 180 B = 945 MB + +-- 启用压缩后(压缩比约 92%): +-- 10 币种 × 1 年 × 180 B × 8% ≈ 75 MB +-- 50 币种 × 1 年 ≈ 375 MB +``` + +| 数据量 | 未压缩 | 压缩后 (92%) | 连续聚合额外 | +|--------|--------|-------------|------------| +| 10 币种 / 1 年 | ~945 MB | ~75 MB | ~30 MB | +| 50 币种 / 1 年 | ~4.7 GB | ~375 MB | ~150 MB | +| 200 币种 / 1 年 | ~18.9 GB | ~1.5 GB | ~600 MB | + +##### 8. Docker Compose 配置 + +```yaml +# docker-compose.yml +version: '3.8' + +services: + timescaledb: + image: timescale/timescaledb:2-pg16 + container_name: trade-timescaledb + restart: unless-stopped + ports: + - "5432:5432" + environment: + POSTGRES_DB: trade + POSTGRES_USER: trader + POSTGRES_PASSWORD: ${DB_PASSWORD:-changeme} + volumes: + - timescaledb-data:/var/lib/postgresql/data + - ./data/init-db:/docker-entrypoint-initdb.d # 自动执行建表 SQL + command: > + -c shared_buffers=1GB + -c effective_cache_size=3GB + -c maintenance_work_mem=256MB + -c work_mem=64MB + -c wal_buffers=64MB + -c random_page_cost=1.1 + -c effective_io_concurrency=200 + -c max_connections=50 + healthcheck: + test: ["CMD-SHELL", "pg_isready -U trader -d trade"] + interval: 10s + timeout: 5s + retries: 5 + +volumes: + timescaledb-data: +``` + +> **初始化脚本**:将上述建表 SQL 保存为 `data/init-db/001_init.sql`,容器首次启动时自动执行。 + +--- + +#### 推荐的 npm 包 + +| 包名 | 用途 | +|------|------| +| `ws` | WebSocket 客户端(轻量、高性能) | +| `ccxt` | 统一交易所 API(支持 100+ 交易所) | +| `pg` | PostgreSQL 客户端(TimescaleDB 兼容) | +| `ioredis` | Redis 客户端(支持集群、哨兵) | +| `zod` | 运行时数据校验,与 Python Pydantic 对应 | +| `pino` | 高性能结构化日志 | +| `rxjs` | 响应式编程,优雅处理数据流 | +| `bull` / `bullmq` | Redis 任务队列,用于定时采集任务 | +| `technicalindicators` | 技术指标计算(备选) | + +--- + +### 2. 策略引擎 (`engine/`) — 🐍 Python + +核心调度中心,负责策略的生命周期管理和信号分发。 + +#### 子模块划分 + +| 子模块 | 职责 | 关键技术点 | +| -------------- | ---------------------------------------------- | -------------------------------------- | +| **策略管理器** | 策略注册、启动、停止、热加载 | 插件化架构、动态导入、`importlib` | +| **信号分发器** | 将策略产生的交易信号分发到交易执行模块 | 事件总线、消息队列 | +| **回测引擎** | 使用历史数据模拟策略执行,评估收益、回撤等指标 | `vectorbt` / `backtrader`、事件驱动 | +| **参数优化器** | 网格搜索 / 贝叶斯优化策略参数 | `scipy.optimize`、`optuna`、并行计算 | + +#### 策略基类设计 + +```python +class BaseStrategy(ABC): + """所有策略的基类""" + + def __init__(self, config: StrategyConfig): + self.config = config + self.position = 0.0 + self.pnl = 0.0 + + @abstractmethod + async def on_ticker(self, ticker: Ticker) -> Signal | None: + """处理 ticker 数据,返回交易信号""" + ... + + @abstractmethod + async def on_kline(self, kline: Kline) -> Signal | None: + """处理 K 线数据,返回交易信号""" + ... + + async def on_orderbook(self, orderbook: OrderBook) -> Signal | None: + """处理深度数据(可选实现)""" + return None +``` + +--- + +### 3. 交易执行模块 (`executor/`) — 🐍 Python + +负责将策略信号转化为实际订单,管理交易所连接和仓位。虽然数据采集在 TypeScript 层完成,但下单操作仍需在 Python 侧执行,因为策略引擎和风控逻辑同在 Python 侧,减少跨语言通信延迟。 + +#### 子模块划分 + +| 子模块 | 职责 | 关键技术点 | +| ---------------- | ------------------------------------------------------ | ---------------------------------------- | +| **交易所适配器** | 封装不同交易所的 REST / WebSocket API,提供统一接口 | 适配器模式、限频控制、签名算法 | +| **订单管理器** | 下单、撤单、查询订单状态;支持限价单、市价单、条件单 | 订单状态机、幂等性、超时处理 | +| **仓位管理器** | 跟踪当前持仓、可用余额、未实现盈亏 | 实时同步、增量更新 | +| **执行算法** | TWAP、VWAP、冰山订单等高级执行算法(降低滑点) | 切片算法、时间加权 | + +#### 交易所适配器接口 + +```python +class ExchangeAdapter(ABC): + """交易所统一适配器接口""" + + @abstractmethod + async def create_order(self, order: Order) -> OrderResult: + ... + + @abstractmethod + async def cancel_order(self, order_id: str) -> bool: + ... + + @abstractmethod + async def get_balance(self) -> dict[str, float]: + ... + + @abstractmethod + async def get_position(self, symbol: str) -> Position: + ... +``` + +--- + +### 4. 风控模块 (`risk/`) — 🐍 Python + +系统的安全防线,在交易前、交易中、交易后全链路控制风险。 + +| 功能 | 说明 | +| ------------------ | ------------------------------------------------------------------ | +| **资金管理** | 单笔下单量限制、总仓位比例限制、逐仓/全仓模式支持 | +| **最大回撤控制** | 当日/累计回撤超过阈值时自动暂停策略 | +| **异常检测** | 检测价格异常波动、交易所 API 异常、网络延迟过高 | +| **熔断机制** | 极端行情下自动停止所有交易,进入只平不开模式 | +| **订单频率限制** | 控制单位时间内的下单次数,防止过度交易 | +| **白名单/黑名单** | 限制可交易的币种和交易对 | + +```python +class RiskManager: + async def check_order(self, order: Order, context: TradingContext) -> RiskResult: + """下单前风控检查""" + checks = [ + self._check_max_position, + self._check_order_value, + self._check_order_frequency, + self._check_drawdown_limit, + self._check_price_deviation, + ] + for check in checks: + result = await check(order, context) + if not result.passed: + return result + return RiskResult(passed=True) +``` + +--- + +### 5. 监控告警模块 (`monitor/`) + +保障系统运行的可观测性。 + +| 功能 | 说明 | +| ---------------- | -------------------------------------------------- | +| **指标收集** | 收集策略收益、胜率、夏普比率、最大回撤等绩效指标 | +| **系统监控** | CPU / 内存 / 网络延迟 / API 调用量 | +| **告警通知** | 通过钉钉、Telegram、飞书等发送告警 | +| **日志管理** | 结构化日志存储,支持按级别、模块检索 | +| **Web 仪表盘** | 实时展示资金曲线、持仓、交易记录、策略状态等 | + +--- + +### 6. API 网关 (`api/`) + +对外提供统一接口,支持浏览器端和移动端访问。 + +| 功能 | 说明 | +| ------------------ | ----------------------------------- | +| **REST API** | 策略管理、查询持仓、查看交易记录 | +| **WebSocket** | 实时推送行情、交易信号和通知 | +| **用户认证** | JWT 认证、API Key 管理 | +| **权限管理** | 多用户角色(管理员 / 只读用户) | + +--- + +### 7. 配置与工具模块 (`common/`) + +提供跨模块共享的基础设施。 + +| 模块 | 功能 | +| -------- | ---------------------------------------- | +| **配置** | 基于 `pydantic-settings` 的环境配置管理 | +| **日志** | 基于 `loguru` 的统一日志配置 | +| **工具** | 时间工具、重试装饰器、限流器、加解密工具 | +| **模型** | 共享的 Pydantic 数据模型(订单、K 线等) | +| **常量** | 交易所枚举、订单类型、时间周期等定义 | + +--- + +## TypeScript 数据模块详解 + +### 核心架构 + +``` +┌─────────────────────────────────────────────────────────────┐ +│ TypeScript 数据模块 │ +│ │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ +│ │ Binance │ │ OKX │ │ Bybit │ │ +│ │ WS Adapter │ │ WS Adapter │ │ WS Adapter │ │ +│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ +│ │ │ │ │ +│ └───────────────────┼───────────────────┘ │ +│ ▼ │ +│ ┌──────────────────────────────────────────────────────┐ │ +│ │ 统一行情流 (RxJS Observable) │ │ +│ │ ┌───────────┐ ┌──────────┐ ┌─────────────────┐ │ │ +│ │ │ ticker$ │ │ trade$ │ │ orderbook$ │ │ │ +│ │ └─────┬─────┘ └────┬─────┘ └───────┬─────────┘ │ │ +│ └────────┼──────────────┼────────────────┼─────────────┘ │ +│ ▼ ▼ ▼ │ +│ ┌──────────────────────────────────────────────────────┐ │ +│ │ K 线合成器 (时间桶算法) │ │ +│ │ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ │ +│ │ │ 1m │ │ 5m │ │ 15m │ │ 1h │ ... │ │ +│ │ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ │ │ +│ └────────┼────────┼────────┼────────┼─────────────────┘ │ +│ ▼ ▼ ▼ ▼ │ +│ ┌──────────────────────────────────────────────────────┐ │ +│ │ 数据清洗 & 异常检测 │ │ +│ └───────────────┬──────────────────┬───────────────────┘ │ +│ ▼ ▼ │ +│ ┌─────────────────────┐ ┌─────────────────────────┐ │ +│ │ TimescaleDB 写入器 │ │ Redis Pub/Sub 发布者 │ │ +│ └─────────────────────┘ └───────────┬─────────────┘ │ +└────────────────────────────────────────┼───────────────────┘ + │ + ▼ + ┌────────────────────┐ + │ Python 策略引擎 │ + │ (消费实时行情) │ + └────────────────────┘ +``` + +### 数据流生命周期 + +``` +交易所 WS ──► TypeScript 采集器 ──► RxJS 管道 + │ + ┌───────────┴───────────┐ + │ │ + ▼ ▼ + TimescaleDB 持久化 Redis Pub/Sub + │ │ + ▼ ▼ + Grafana 查询 Python 策略消费 + (历史分析) (实时交易) +``` + +### TypeScript 数据模块目录结构 + +``` +data/ +├── package.json +├── tsconfig.json +├── vitest.config.ts +├── src/ +│ ├── index.ts # 模块入口 +│ ├── config.ts # 配置(基于 zod 校验) +│ ├── logger.ts # 日志(pino) +│ │ +│ ├── exchanges/ # 交易所适配器 +│ │ ├── types.ts # 交易所统一接口 +│ │ ├── base.ts # 抽象基类 +│ │ ├── binance.ts # Binance 实现 +│ │ ├── okx.ts # OKX 实现 +│ │ └── bybit.ts # Bybit 实现 +│ │ +│ ├── pipeline/ # 数据管道 +│ │ ├── kline-synthesizer.ts # K 线合成器 +│ │ ├── cleaner.ts # 数据清洗 +│ │ └── transformer.ts # 数据转换 +│ │ +│ ├── storage/ # 数据存储 +│ │ ├── timescaledb.ts # TimescaleDB 写入器 +│ │ └── postgres.ts # PostgreSQL 客户端(业务数据) +│ │ +│ ├── publisher/ # 数据发布 +│ │ ├── redis.ts # Redis Pub/Sub +│ │ └── nats.ts # NATS(可选) +│ │ +│ ├── types/ # 类型定义 +│ │ ├── market.ts # Ticker, Kline, Trade… +│ │ ├── exchange.ts # 交易所枚举 +│ │ └── enums.ts # 常量枚举 +│ │ +│ └── utils/ # 工具函数 +│ ├── retry.ts # 重试逻辑 +│ ├── rate-limiter.ts # 限频器 +│ └── time.ts # 时间工具 +│ +└── tests/ + ├── exchanges/ + ├── pipeline/ + └── storage/ +``` + +### Python ↔ TypeScript 跨语言通信 + +``` +┌──────────────────┐ Redis/NATS ┌──────────────────┐ +│ 🟦 TypeScript │ ──────────────────────────► │ 🐍 Python │ +│ 数据模块 │ Pub: "market:ticker" │ 策略引擎 │ +│ │ Pub: "market:kline:1m" │ │ +│ │ Pub: "market:trade" │ Sub: 消费行情 │ +└──────────────────┘ └──────────────────┘ + │ │ + │ Redis Pub/Sub 频道设计 │ + │ ───────────────────── │ + │ market:ticker:{exchange}:{symbol} │ + │ market:kline:{exchange}:{symbol}:{interval} │ + │ market:trade:{exchange}:{symbol} │ + │ system:heartbeat:{module} │ + │ system:error:{module} │ + │ │ + └─────────────────────────────────────────────────┘ +``` + +**数据序列化格式**:推荐使用 **Protocol Buffers** 或 **MessagePack**,比 JSON 更小、更快: + +```typescript +// TypeScript 侧 - 发布 +const data = { symbol: 'BTCUSDT', price: 50000, timestamp: Date.now() }; +const encoded = msgpack.encode(data); // 比 JSON 小 30-50% +await redis.publish('market:ticker:binance:BTCUSDT', encoded); +``` + +```python +# Python 侧 - 消费 +import msgpack +async with redis.pubsub() as pubsub: + await pubsub.subscribe('market:ticker:binance:BTCUSDT') + async for msg in pubsub.listen(): + if msg['type'] == 'message': + data = msgpack.unpackb(msg['data']) + await strategy.on_ticker(Ticker(**data)) +``` + +--- + +## 开发步骤 + +### 第一阶段:基础建设(第 1-2 周) + +**目标**:搭建项目骨架,完成 TypeScript 数据模块的基础能力和 Python 项目基础。 + +| 步骤 | 任务 | 产出物 | +| ---- | ------------------------------------------------------------ | ----------------------------------- | +| 1.1 | 初始化 TypeScript 数据项目(`data/`),配置 `package.json` | 项目结构,ESLint + Prettier 配置 | +| 1.2 | 初始化 Python 项目,配置 `poetry` / `uv` 依赖管理 | `pyproject.toml`,项目目录结构 | +| 1.3 | 定义共享类型:TypeScript `types/` + Python `common/models.py` | 双端对齐的数据模型 | +| 1.4 | 实现 TS 交易所适配器基类 + Binance 适配器(`data/src/exchanges/`) | 统一接口 + WebSocket 连接 | +| 1.5 | 实现 TS 行情采集器,WebSocket 订阅实时 ticker 和 K 线 | 实时行情流入 | +| 1.6 | 实现 TS K 线合成器(`data/src/pipeline/kline-synthesizer.ts`) | 多周期 K 线实时合成 | +| 1.7 | 实现 TS 数据存储模块,写入 TimescaleDB | 数据持久化 | +| 1.8 | 实现 TS → Redis 数据发布 | 实时行情推送到消息队列 | +| 1.9 | Python 侧实现配置管理 + 日志 + Redis 订阅消费者 | 消费端就绪 | +| 1.10 | Docker Compose 编排基础服务(TimescaleDB / Redis) | `docker-compose.yml` | + +### 第二阶段:策略与回测(第 3-4 周) + +**目标**:实现策略框架和回测引擎,能编写策略并进行回测验证。 + +| 步骤 | 任务 | 产出物 | +| ---- | ------------------------------------------------------------ | -------------------------------- | +| 2.1 | 实现策略基类(`engine/base.py`) | `BaseStrategy` 抽象基类 | +| 2.2 | 实现策略管理器(`engine/manager.py`),支持策略注册和生命周期 | 策略热加载、启动/停止控制 | +| 2.3 | 实现信号分发器(`engine/signals.py`) | 事件总线,策略到执行器的信号传递 | +| 2.4 | 实现回测引擎(`engine/backtest.py`) | 历史数据回测,收益曲线、回撤等指标 | +| 2.5 | 实现技术指标计算(`data/indicators.py`) | MA、MACD、RSI 等常用指标 | +| 2.6 | 编写示例策略 1:双均线交叉策略 | 可运行的策略示例 | +| 2.7 | 编写示例策略 2:网格交易策略 | 可运行的策略示例 | +| 2.8 | 实现参数优化器(`engine/optimizer.py`) | 基于 Optuna 的参数搜索 | + +### 第三阶段:交易执行与风控(第 5-6 周) + +**目标**:连接实盘交易通道,实现完整的交易执行流程和风控体系。 + +| 步骤 | 任务 | 产出物 | +| ---- | ------------------------------------------------------------ | --------------------------------- | +| 3.1 | 实现订单管理器(`executor/order_manager.py`) | 下单、撤单、订单状态更新 | +| 3.2 | 实现仓位管理器(`executor/position_manager.py`) | 实时仓位跟踪 | +| 3.3 | 实现交易状态机(`executor/state_machine.py`) | 订单生命周期管理(Created→Filled) | +| 3.4 | 实现重试与容错机制(`executor/retry.py`) | 网络异常自动重试、幂等性保证 | +| 3.5 | 实现风控管理器(`risk/manager.py`) | 资金管理、仓位限制、熔断逻辑 | +| 3.6 | 实现风控规则引擎(`risk/rules.py`) | 可扩展的风控规则链 | +| 3.7 | 整合 TS 数据模块 → Redis → Python 策略引擎 → 交易执行的端到端流程 | 完整的跨语言运行流水线 | +| 3.8 | 编写集成测试,模拟实盘环境验证 | 测试覆盖率报告 | + +### 第四阶段:API 与监控(第 7-8 周) + +**目标**:完善系统的可观测性和对外接口。 + +| 步骤 | 任务 | 产出物 | +| ---- | ------------------------------------------------------------ | ----------------------------------- | +| 4.1 | 搭建 FastAPI 项目(`api/main.py`) | REST API 服务启动 | +| 4.2 | 实现 WebSocket 实时推送(`api/ws.py`) | 行情和交易数据实时推送 | +| 4.3 | 实现策略管理 API(`api/routes/strategies.py`) | 策略启停、参数修改接口 | +| 4.4 | 实现交易记录查询 API(`api/routes/trades.py`) | 交易历史查询 | +| 4.5 | 集成 Prometheus 指标(`monitor/metrics.py`) | 关键业务指标暴露 | +| 4.6 | 配置 Grafana 仪表盘 | 可视化资金曲线、交易频率、策略绩效 | +| 4.7 | 实现告警通知集成(`monitor/alerts.py`) | Telegram / 钉钉通知 | +| 4.8 | JWT 用户认证与 API Key 管理 | 安全访问控制 | + +### 第五阶段:优化与生产化(第 9-10 周) + +**目标**:性能优化,系统加固,上线准备。 + +| 步骤 | 任务 | 产出物 | +| ---- | ------------------------------------------------------------ | -------------------------------- | +| 5.1 | 性能优化:数据库批量写入、连接池调优、TS/Python 异步性能分析 | 性能测试报告 | +| 5.2 | 增加单元测试与集成测试,目标覆盖率 > 80% | 测试报告 | +| 5.3 | 编写部署文档(`docs/deployment.md`) | 部署手册 | +| 5.4 | 编写使用文档(`docs/usage.md`) | 用户手册 | +| 5.5 | CI/CD 流水线配置(GitHub Actions) — TS 和 Python 双管道 | 自动化测试、构建、部署 | +| 5.6 | 压力测试与稳定性测试 | 压测报告 | +| 5.7 | 生产环境部署与监控上線 | 线上系统 | + +--- + +## 项目目录结构 + +``` +trade/ +├── pyproject.toml # Python 项目依赖与配置(Poetry / uv) +├── package.json # TS 项目根依赖(可选 monorepo 管理) +├── docker-compose.yml # 基础服务编排 +├── Dockerfile # 应用容器化 +├── .env.example # 环境变量模板 +├── .gitignore +├── README.md +│ +├── config/ # Python 配置文件 +│ ├── __init__.py +│ ├── settings.py # pydantic-settings 全局配置 +│ └── strategies/ # 策略配置文件 +│ ├── ma_cross.yaml +│ └── grid_trading.yaml +│ +├── data/ # 🟦 TypeScript 数据模块 +│ ├── package.json +│ ├── tsconfig.json +│ ├── vitest.config.ts +│ ├── src/ +│ │ ├── index.ts # 模块入口 +│ │ ├── config.ts # 配置 +│ │ ├── exchanges/ # 交易所适配器 +│ │ │ ├── types.ts +│ │ │ ├── base.ts +│ │ │ ├── binance.ts +│ │ │ ├── okx.ts +│ │ │ └── bybit.ts +│ │ ├── pipeline/ # 数据管道 +│ │ │ ├── kline-synthesizer.ts +│ │ │ ├── cleaner.ts +│ │ │ └── transformer.ts +│ │ ├── storage/ # 数据存储 +│ │ │ ├── timescaledb.ts # TimescaleDB K 线写入 +│ │ │ └── postgres.ts # PostgreSQL 其他数据 +│ │ ├── publisher/ # 数据发布 +│ │ │ ├── redis.ts +│ │ │ └── nats.ts +│ │ ├── types/ # 类型定义 +│ │ │ ├── market.ts +│ │ │ ├── exchange.ts +│ │ │ └── enums.ts +│ │ └── utils/ +│ │ ├── retry.ts +│ │ ├── rate-limiter.ts +│ │ └── time.ts +│ └── tests/ +│ +├── engine/ # 🐍 Python 策略引擎 +│ ├── __init__.py +│ ├── base.py # BaseStrategy 基类 +│ ├── manager.py # 策略管理器 +│ ├── signals.py # 信号分发器 +│ ├── backtest.py # 回测引擎 +│ └── optimizer.py # 参数优化器 +│ +├── executor/ # 🐍 Python 交易执行模块 +│ ├── __init__.py +│ ├── exchange.py # 交易所适配器(基类+实现) +│ ├── order_manager.py # 订单管理器 +│ ├── position_manager.py # 仓位管理器 +│ ├── state_machine.py # 订单状态机 +│ └── retry.py # 重试与容错 +│ +├── risk/ # 🐍 Python 风控模块 +│ ├── __init__.py +│ ├── manager.py # 风控管理器 +│ └── rules.py # 风控规则 +│ +├── monitor/ # 监控告警模块 +│ ├── __init__.py +│ ├── metrics.py # Prometheus 指标 +│ ├── alerts.py # 告警通知 +│ └── dashboard.py # 仪表盘数据聚合 +│ +├── api/ # API 网关 +│ ├── __init__.py +│ ├── main.py # FastAPI 入口 +│ ├── ws.py # WebSocket 处理 +│ ├── auth.py # 用户认证 +│ └── routes/ # 路由 +│ ├── __init__.py +│ ├── strategies.py +│ ├── trades.py +│ └── account.py +│ +├── strategies/ # 🐍 Python 策略实现 +│ ├── __init__.py +│ ├── ma_cross.py # 双均线交叉策略 +│ ├── grid_trading.py # 网格交易策略 +│ └── arbitrage.py # 套利策略(可选) +│ +├── common/ # 🐍 Python 公共工具模块 +│ ├── __init__.py +│ ├── logger.py # 日志配置 +│ ├── models.py # 数据模型(Pydantic,与 TS types 对应) +│ ├── constants.py # 常量定义 +│ └── utils.py # 工具函数 +│ +├── tests/ # 🐍 Python 测试 +│ ├── __init__.py +│ ├── data/ +│ ├── engine/ +│ ├── executor/ +│ ├── risk/ +│ └── conftest.py # pytest fixtures +│ +└── docs/ # 文档 + ├── deployment.md + ├── usage.md + └── api_reference.md +``` + +--- + +## 部署与运维 + +### 服务组件依赖 + +``` +┌──────────────────┐ ┌──────────────┐ ┌─────────────┐ +│ TS 数据模块 │ │ PostgreSQL │ │ Grafana │ +│ (Node.js 进程) │ │ (业务数据) │ │ (可视化) │ +└──────┬───────────┘ └──────────────┘ └──────┬──────┘ + │ │ + │ ┌──────────────┐ │ + ├────────────▶ TimescaleDB │◀────────────┤ + │ │ (时序存储) │ │ + │ └──────────────┘ │ + │ │ + │ ┌──────────────┐ │ + ├────────────▶ Redis ├─────────────┤ + │ │ (队列/缓存) │ │ + │ └──────┬───────┘ │ + │ │ │ + │ ┌──────▼───────┐ │ + └────────────▶ Python 引擎 │ │ + │ (策略+执行) │ │ + └──────────────┘ │ +``` + +### 容器化方案 + +```dockerfile +# Dockerfile.data — TypeScript 数据模块 +FROM node:20-alpine AS builder +WORKDIR /app +COPY data/package*.json ./ +RUN npm ci +COPY data/ . +RUN npm run build + +FROM node:20-alpine AS runner +WORKDIR /app +COPY --from=builder /app/dist ./dist +COPY --from=builder /app/node_modules ./node_modules +CMD ["node", "dist/index.js"] +``` + +```dockerfile +# Dockerfile.engine — Python 业务引擎 +FROM python:3.11-slim +WORKDIR /app +COPY pyproject.toml poetry.lock ./ +RUN pip install poetry && poetry install --no-dev +COPY . . +CMD ["poetry", "run", "python", "-m", "engine.main"] +``` + +### 关键运维指标 + +- **API 延迟**:单次下单 < 500ms(网络延迟不计) +- **数据延迟**:行情从交易所到 Redis < 500ms(TypeScript 采集) +- **系统可用性**:核心交易链路 > 99.9% +- **策略执行周期**:秒级 tick 策略 < 100ms / 次 +- **跨语言通信延迟**:Redis Pub/Sub < 1ms(同机部署) + +--- + +## 注意事项与风险提示 + +> ⚠️ **风险声明**:数字货币交易具有高风险,本系统仅为技术实现,不构成任何投资建议。使用前请充分了解风险。 + +### 技术注意事项 + +1. **API 限频**:各交易所均有严格 API 限频,需实现本地限流器 +2. **WebSocket 重连**:网络波动导致断线,需实现心跳检测和自动重连(指数退避策略) +3. **数据一致性**:订单状态需轮询确认,避免因异步回调丢失状态 +4. **时间同步**:服务器需 NTP 同步,防止时间偏差导致下单失败 +5. **日志安全**:避免记录 API Secret 等敏感信息到日志 +6. **灰度发布**:新策略先在模拟盘运行,验证通过后方可接入实盘 +7. **资金隔离**:不同策略的资金账户严格隔离,防止交叉影响 +8. **跨语言类型一致性**:TypeScript 的 `zod` schema 和 Python 的 `Pydantic` model 需保持同步,建议用自动化脚本生成或共享 schema 文件 + +### 开发建议 + +1. **模块化开发**:各模块独立开发、独立测试,通过接口契约协作 +2. **代码质量**:TS 用 `ESLint` + `Prettier`,Python 用 `ruff` + `mypy` +3. **版本控制**:配置文件和策略参数不应硬编码,使用 `.env` 或配置文件 +4. **日志先行**:先写好日志,再写业务逻辑,便于调试 +5. **单元测试**:核心逻辑(风控、订单状态机、K 线合成器)必须有单元测试覆盖 +6. **模拟盘先行**:实盘前至少 2 周模拟盘验证策略稳定性 +7. **本地开发**:Python 和 TS 模块分别 `npm run dev` / `poetry run dev` 独立开发调试 +8. **schema 驱动**:考虑使用 Protocol Buffers 的 `.proto` 文件同时生成 TS 和 Python 代码,保证类型完全一致 + +--- + +## 许可证 + +MIT License diff --git a/data/.env.example b/data/.env.example new file mode 100644 index 0000000..11c53f1 --- /dev/null +++ b/data/.env.example @@ -0,0 +1,42 @@ +# ============================================================ +# Trade Data Module — 环境变量配置模板 +# ============================================================ +# 复制为 .env 并修改: +# cp .env.example .env +# ============================================================ + +# --- 行情订阅 --- +# 逗号分隔的交易对列表(大写) +SYMBOLS=BTCUSDT,ETHUSDT + +# --- TimescaleDB 连接 --- +DB_HOST=localhost +DB_PORT=5432 +DB_NAME=trade +DB_USER=trader +DB_PASSWORD=changeme + +# --- Redis 连接 --- +REDIS_URL=redis://localhost:6379 +# 是否启用 Redis 发布(开发时可关闭) +REDIS_PUBLISH_ENABLED=true + +# --- 批量写入 --- +# 缓冲区条数阈值(达到后自动刷新) +BATCH_SIZE=500 +# 最大缓冲时间(毫秒),超时后自动刷新 +FLUSH_INTERVAL_MS=1000 + +# --- WebSocket 连接 --- +# 断线重连延迟基数(毫秒),指数退避:基数 × 2^attempts +WS_RECONNECT_DELAY_MS=3000 +# 心跳间隔(毫秒) +WS_PING_INTERVAL_MS=30000 +# 最大重连次数 +WS_MAX_RECONNECT_ATTEMPTS=10 + +# --- 日志 --- +# 日志级别:trace / debug / info / warn / error / fatal +LOG_LEVEL=debug +# 生产环境(关闭 pretty print,输出 JSON) +NODE_ENV=development diff --git a/data/init-db/001_init.sql b/data/init-db/001_init.sql new file mode 100644 index 0000000..3bab82d --- /dev/null +++ b/data/init-db/001_init.sql @@ -0,0 +1,228 @@ +-- ============================================================ +-- 001_init.sql — TimescaleDB 数据初始化 +-- +-- Docker Compose 首次启动时自动执行 +-- 挂载路径:./data/init-db:/docker-entrypoint-initdb.d +-- ============================================================ + +-- 扩展 +CREATE EXTENSION IF NOT EXISTS timescaledb; +CREATE EXTENSION IF NOT EXISTS timescaledb_toolkit; + +-- ============================================================ +-- 1. K 线主表 +-- ============================================================ +CREATE TABLE IF NOT EXISTS klines ( + -- 时间维度 + time TIMESTAMPTZ NOT NULL, -- K 线开盘时间(UTC) + + -- 标识维度 + exchange TEXT NOT NULL, -- 交易所:binance/okx/bybit + symbol TEXT NOT NULL, -- 交易对:BTCUSDT/ETHUSDT + interval TEXT NOT NULL, -- 周期:1m/5m/15m/1h/4h/1d + + -- OHLCV + open NUMERIC(20,8) NOT NULL, + high NUMERIC(20,8) NOT NULL, + low NUMERIC(20,8) NOT NULL, + close NUMERIC(20,8) NOT NULL, + volume NUMERIC(20,8) NOT NULL DEFAULT 0, -- 成交量(基准币种) + + -- 扩展字段 + quote_volume NUMERIC(20,8) DEFAULT 0, -- 成交额(计价币种) + taker_buy_base_vol NUMERIC(20,8) DEFAULT 0, -- 主动买入量 + taker_buy_quote_vol NUMERIC(20,8) DEFAULT 0, -- 主动买入额 + trade_count INTEGER DEFAULT 0, -- 成交笔数 + is_closed BOOLEAN DEFAULT TRUE, -- K 线是否已闭合 + + -- 元数据 + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + + -- 唯一约束(同一根 K 线不可重复) + UNIQUE (time, exchange, symbol, interval) +); + +-- ============================================================ +-- 2. 转换为 hypertable(时序分区) +-- ============================================================ +SELECT create_hypertable( + 'klines', + 'time', -- 时间列 + chunk_time_interval => INTERVAL '1 day', -- 每个 chunk = 1 天数据 + partitioning_column => 'exchange', -- 空间分区列 + number_partitions => 4, -- 4 个空间分区 + if_not_exists => TRUE +); + +-- ============================================================ +-- 3. 索引 +-- ============================================================ + +-- 主力查询索引:按交易对+周期+时间范围查(覆盖 95% 查询) +CREATE INDEX IF NOT EXISTS idx_klines_lookup + ON klines (exchange, symbol, interval, time DESC); + +-- 回测专用索引:按交易对+周期+时间正序 +CREATE INDEX IF NOT EXISTS idx_klines_backtest + ON klines (symbol, interval, time ASC); + +-- 最新 K 线索引(部分索引,仅覆盖已闭合 K 线) +CREATE INDEX IF NOT EXISTS idx_klines_latest + ON klines (exchange, symbol, interval, time DESC) + WHERE is_closed = TRUE; + +-- ============================================================ +-- 4. 压缩策略 +-- ============================================================ + +-- 启用列式压缩(按 symbol+interval 分组,按 time 排序) +ALTER TABLE klines SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'exchange, symbol, interval', + timescaledb.compress_orderby = 'time DESC' +); + +-- 自动压缩:K 线闭合 7 天后自动压缩(压缩比约 90%) +SELECT add_compression_policy('klines', INTERVAL '7 days', if_not_exists => TRUE); + +-- ============================================================ +-- 5. 数据保留策略 +-- ============================================================ +-- 1m K 线保留 90 天(回测通常用更粗粒度) +SELECT add_retention_policy('klines', INTERVAL '90 days', if_not_exists => TRUE); + +-- ============================================================ +-- 6. 连续聚合(从 1m 自动派生高周期 K 线) +-- ============================================================ + +-- ---------- 5m K 线 ---------- +CREATE MATERIALIZED VIEW IF NOT EXISTS klines_5m +WITH (timescaledb.continuous) AS +SELECT + time_bucket('5 minutes', time) AS time, + exchange, + symbol, + '5m'::TEXT AS interval, + FIRST(open, time) AS open, + MAX(high) AS high, + MIN(low) AS low, + LAST(close, time) AS close, + SUM(volume) AS volume, + SUM(quote_volume) AS quote_volume, + SUM(taker_buy_base_vol) AS taker_buy_base_vol, + SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, + SUM(trade_count) AS trade_count +FROM klines +WHERE interval = '1m' +GROUP BY time_bucket('5 minutes', time), exchange, symbol; + +SELECT add_continuous_aggregate_policy('klines_5m', + start_offset => INTERVAL '1 day', + end_offset => INTERVAL '10 minutes', + schedule_interval => INTERVAL '1 minute', + if_not_exists => TRUE +); + +-- ---------- 15m K 线 ---------- +CREATE MATERIALIZED VIEW IF NOT EXISTS klines_15m +WITH (timescaledb.continuous) AS +SELECT + time_bucket('15 minutes', time) AS time, + exchange, + symbol, + '15m'::TEXT AS interval, + FIRST(open, time) AS open, + MAX(high) AS high, + MIN(low) AS low, + LAST(close, time) AS close, + SUM(volume) AS volume, + SUM(quote_volume) AS quote_volume, + SUM(taker_buy_base_vol) AS taker_buy_base_vol, + SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, + SUM(trade_count) AS trade_count +FROM klines +WHERE interval = '1m' +GROUP BY time_bucket('15 minutes', time), exchange, symbol; + +SELECT add_continuous_aggregate_policy('klines_15m', + start_offset => INTERVAL '2 days', + end_offset => INTERVAL '30 minutes', + schedule_interval => INTERVAL '5 minutes', + if_not_exists => TRUE +); + +-- ---------- 1h K 线 ---------- +CREATE MATERIALIZED VIEW IF NOT EXISTS klines_1h +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 hour', time) AS time, + exchange, + symbol, + '1h'::TEXT AS interval, + FIRST(open, time) AS open, + MAX(high) AS high, + MIN(low) AS low, + LAST(close, time) AS close, + SUM(volume) AS volume, + SUM(quote_volume) AS quote_volume, + SUM(taker_buy_base_vol) AS taker_buy_base_vol, + SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, + SUM(trade_count) AS trade_count +FROM klines +WHERE interval = '1m' +GROUP BY time_bucket('1 hour', time), exchange, symbol; + +SELECT add_continuous_aggregate_policy('klines_1h', + start_offset => INTERVAL '3 days', + end_offset => INTERVAL '1 hour', + schedule_interval => INTERVAL '5 minutes', + if_not_exists => TRUE +); + +-- ---------- 1d K 线 ---------- +CREATE MATERIALIZED VIEW IF NOT EXISTS klines_1d +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 day', time) AS time, + exchange, + symbol, + '1d'::TEXT AS interval, + FIRST(open, time) AS open, + MAX(high) AS high, + MIN(low) AS low, + LAST(close, time) AS close, + SUM(volume) AS volume, + SUM(quote_volume) AS quote_volume, + SUM(taker_buy_base_vol) AS taker_buy_base_vol, + SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, + SUM(trade_count) AS trade_count +FROM klines +WHERE interval = '1m' +GROUP BY time_bucket('1 day', time), exchange, symbol; + +SELECT add_continuous_aggregate_policy('klines_1d', + start_offset => INTERVAL '7 days', + end_offset => INTERVAL '2 hours', + schedule_interval => INTERVAL '1 hour', + if_not_exists => TRUE +); + +-- ============================================================ +-- 7. 连续聚合的压缩(减少视图存储) +-- ============================================================ +ALTER MATERIALIZED VIEW klines_5m SET (timescaledb.compress = true); +ALTER MATERIALIZED VIEW klines_15m SET (timescaledb.compress = true); +ALTER MATERIALIZED VIEW klines_1h SET (timescaledb.compress = true); +ALTER MATERIALIZED VIEW klines_1d SET (timescaledb.compress = true); + +-- ============================================================ +-- 初始化完成 +-- ============================================================ +DO $$ +BEGIN + RAISE NOTICE 'TimescaleDB initialization complete.'; + RAISE NOTICE 'Hypertable: klines'; + RAISE NOTICE 'Continuous aggregates: klines_5m, klines_15m, klines_1h, klines_1d'; + RAISE NOTICE 'Compression: 7 days delay, 90 days retention'; +END $$; diff --git a/data/package.json b/data/package.json new file mode 100644 index 0000000..d88b975 --- /dev/null +++ b/data/package.json @@ -0,0 +1,34 @@ +{ + "name": "trade-data", + "version": "0.1.0", + "description": "数字货币量化交易系统 - TypeScript 数据模块", + "type": "module", + "scripts": { + "dev": "tsx watch src/index.ts", + "build": "tsc", + "start": "node dist/index.js", + "test": "vitest run", + "test:watch": "vitest", + "lint": "eslint src/", + "format": "prettier --write src/" + }, + "dependencies": { + "binance": "^3.5.9", + "ccxt": "^4.5.56", + "ioredis": "^5.11.1", + "pg": "^8.21.0", + "pino": "^10.3.1", + "ws": "^8.21.0", + "zod": "^4.4.3" + }, + "devDependencies": { + "@types/node": "^25.9.2", + "@types/pg": "^8.20.0", + "@types/ws": "^8.18.1", + "eslint": "^10.4.1", + "prettier": "^3.8.3", + "tsx": "^4.22.4", + "typescript": "^6.0.3", + "vitest": "^4.1.8" + } +} diff --git a/data/tsconfig.json b/data/tsconfig.json new file mode 100644 index 0000000..734c0ef --- /dev/null +++ b/data/tsconfig.json @@ -0,0 +1,31 @@ +{ + "compilerOptions": { + // Environment setup & latest features + "lib": [ + "ESNext" + ], + "target": "ESNext", + "module": "Preserve", + "moduleDetection": "force", + "jsx": "react-jsx", + "allowJs": true, + "types": [ + "bun" + ], + // Bundler mode + "moduleResolution": "bundler", + "allowImportingTsExtensions": true, + "verbatimModuleSyntax": true, + "noEmit": true, + // Best practices + "strict": true, + "skipLibCheck": true, + "noFallthroughCasesInSwitch": true, + "noUncheckedIndexedAccess": true, + "noImplicitOverride": true, + // Some stricter flags (disabled by default) + "noUnusedLocals": false, + "noUnusedParameters": false, + "noPropertyAccessFromIndexSignature": false + } +} \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..58070ef --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,28 @@ +services: + timescaledb: + image: timescale/timescaledb-ha:pg17.10-ts2.27.1 + container_name: trade-timescaledb + restart: unless-stopped + ports: + - "5432:5432" + environment: + POSTGRES_DB: trade + POSTGRES_USER: trader + POSTGRES_PASSWORD: fucketh + volumes: + - ./db/pgsql:/var/lib/postgresql + - ./data/init-db:/docker-entrypoint-initdb.d # 自动执行建表 SQL + command: > + -c shared_buffers=1GB + -c effective_cache_size=3GB + -c maintenance_work_mem=256MB + -c work_mem=64MB + -c wal_buffers=64MB + -c random_page_cost=1.1 + -c effective_io_concurrency=200 + -c max_connections=50 + healthcheck: + test: ["CMD-SHELL", "pg_isready -U trader -d trade"] + interval: 10s + timeout: 5s + retries: 5