Files
trade/README.md
T
Rekey b5cdb41993 chore: 更新 README 架构文档与数据库测试脚本
- README.md: 更新数据层 Node.js→Bun,common→engine/common,同步目录树结构
- db_test.py: TimescaleDB 数据库连接与基础查询测试脚本
2026-06-12 10:27:11 +08:00

1746 lines
73 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 数字货币量化交易系统开发计划
> 基于 Python + TypeScript 混合架构的数字货币量化交易系统 —— 核心模块划分与开发步骤
---
## 目录
1. [项目概述](#项目概述)
2. [技术栈选型](#技术栈选型)
3. [架构方案对比](#架构方案对比)
4. [核心模块划分](#核心模块划分)
5. [TypeScript 数据模块详解](#typescript-数据模块详解)
6. [开发步骤](#开发步骤)
7. [项目目录结构](#项目目录结构)
8. [部署与运维](#部署与运维)
9. [注意事项与风险提示](#注意事项与风险提示)
---
## 项目概述
本项目的目标是构建一套**可扩展、低延迟、稳定可靠**的数字货币量化交易系统,支持:
- 多交易所接入(Binance、OKX、Bybit 等)
- 多策略并行运行
- 实时行情数据采集与存储
- 策略回测与参数优化
- 实盘交易执行与风控管理
- 可视化监控与告警
### 架构策略
采用 **Python + TypeScript 混合架构**
| 层 | 语言 | 职责 |
|---|------|------|
| **数据层** | TypeScript (Bun) | 行情采集、WebSocket 连接管理、K 线合成、数据写入 |
| **业务层** | Python 3.10+ | 策略引擎、回测、风控、交易执行 |
| **接口层** | TypeScript / Python | FastAPI (Python) 或 NestJS (TS) 提供 REST/WS API |
---
## 技术栈选型
| 分类 | 技术 / 库 | 说明 |
| ------------ | ---------------------------------- | -------------------------------------- |
| **数据层语言** | **TypeScript 5.x (Bun)** | 行情采集、WebSocket 管理、数据管道 |
| **业务层语言** | **Python 3.10+** | 策略引擎、回测、风控逻辑 |
| **时序数据库** | **TimescaleDB (推荐)** | K 线数据存储,基于 PostgreSQL 扩展 |
| 关系型数据库 | PostgreSQL 16+ | 订单、策略配置、用户数据等(TimescaleDB 基于 PG,可共用)|
| 消息队列 | Redis / NATS | 跨语言数据流解耦,事件驱动 |
| 异步框架(TS) | `ws` + `axios` + `bull` | WebSocket 客户端、HTTP 请求、任务队列 |
| 异步框架(Py) | `asyncio` + `aiohttp` / `httpx` | 异步 I/O,高性能网络请求 |
| 数据分析 | `pandas` + `numpy` + `ta` / `talib` | 技术指标计算与数据分析 |
| 策略回测 | `backtrader` / `vectorbt` / 自研 | 高性能回测引擎 |
| Web 框架 | FastAPI + Uvicorn | REST API 与 WebSocket 服务 |
| 可视化 | Grafana + Streamlit | 监控仪表盘与交互式分析 |
| 任务调度 | Celery / APScheduler / Bull | 定时任务与异步任务队列 |
| 配置管理 | pydantic-settings + `.env` | 类型安全的配置管理 |
| 日志监控 | loguru + winston + sentry | 结构化日志与异常追踪 |
| **类型校验** | **Zod (TS) / Pydantic (Py)** | 运行时数据校验,双端类型一致性保证 |
---
## 架构方案对比
### 方案 A:纯 Python(原方案)
```
[Python] 数据采集 → 策略引擎 → 交易执行 → 风控
```
| 优势 | 劣势 |
|------|------|
| 单一语言,维护简单 | WebSocket 大规模连接管理不如 Node.js |
| 数据分析生态无敌 | Python GIL 在 CPU 密集型场景受限 |
| 量化金融社区成熟 | 异步生态相对 Node.js 较新 |
### 方案 BTypeScript 数据模块 + Python 业务引擎 ✅ **(推荐)**
```
[TypeScript] 数据采集 → Redis/消息队列 → [Python] 策略引擎 → 交易执行
```
| 优势 | 劣势 |
|------|------|
| Node.js WebSocket 性能优异,天然适合高并发连接 | 需要维护两套技术栈 |
| TypeScript 类型系统在数据管道中减少运行时错误 | 跨语言调试稍复杂 |
| **共享类型**:前后端可用同一套 TypeScript 类型定义 | 部署需要 Bun + Python 双运行时 |
| 事件驱动模型与行情数据流天然匹配 | 团队需要双语言能力 |
| npm 生态有成熟的交易 SDKccxt 等) | |
### 方案 C:全 TypeScript
```
[TypeScript] 数据采集 → 策略引擎 → 交易执行 → 风控
```
| 优势 | 劣势 |
|------|------|
| 单一语言 | 回测/量化分析生态远不如 Python |
| 类型安全 | 缺乏 pandas/numpy 级别的数据处理库 |
| 全栈一致性(前后端 + 数据层共用 TS) | 量化社区资源少,需要自研大量基础设施 |
---
## 核心模块划分
系统划分为 **7 大核心模块**,模块间通过消息队列或 API 解耦,整体架构如下:
```
┌─────────────────────────────────────────────────────────────┐
│ Web UI / Dashboard │
│ (FastAPI / Streamlit / Grafana) │
└──────────────┬──────────────────┬───────────────────────────┘
│ │
┌──────────▼──────────┐ ┌───▼────────────────────┐
│ API Gateway │ │ 监控告警模块 │
│ (REST + WebSocket) │ │ (Prometheus + Alert) │
└──────────┬──────────┘ └────────────────────────┘
┌──────────▼──────────────────────────────────────────┐
│ 策略引擎 🐍 Python │
│ (策略加载 / 生命周期管理 / 信号分发) │
└────┬──────────────┬──────────────┬─────────────────┘
│ │ │
┌────▼────┐ ┌─────▼─────┐ ┌───▼──────────┐
│ 策略 A │ │ 策略 B │ │ 策略 C ... │
│(趋势) │ │ (网格) │ │ │
└────┬────┘ └─────┬─────┘ └───┬──────────┘
│ │ │
┌────▼──────────────▼──────────────▼─────────────────┐
│ 交易执行模块 🐍 Python │
│ (订单管理 / 仓位管理 / 交易所适配器 / 重试机制) │
└────────────────────┬───────────────────────────────┘
┌────────────────────▼───────────────────────────────┐
│ 消息队列 (Redis / NATS) │
│ ┌───────────── 数据管道 ─────────────┐ │
└────────────────────┬───────────────────────────────┘
┌────────────────────▼───────────────────────────────┐
│ 数据模块 🟦 TypeScript │
│ (行情采集 / K线合成 / 数据存储 / 数据清洗 / 因子计算) │
└────────────────────┬───────────────────────────────┘
┌────────────────────▼───────────────────────────────┐
│ 风控模块 🐍 Python │
│ (资金管理 / 仓位限制 / 最大回撤 / 异常交易熔断 / 白名单)│
└─────────────────────────────────────────────────────┘
```
---
### 1. 数据模块 (`data/`) — 🟦 TypeScript
负责从交易所获取和处理市场数据,是一切策略和决策的基础。
#### 为什么数据模块用 TypeScript
| 原因 | 说明 |
|------|------|
| **WebSocket 性能** | Node.js 事件循环天然适合处理大量 WebSocket 连接(单进程万级连接) |
| **内存管理** | V8 引擎的垃圾回收机制在短生命周期对象场景表现优异 |
| **类型安全** | TypeScript 编译期检查确保数据管道中的类型正确性 |
| **流式处理** | Node.js Stream / Observable 模式与行情数据流完美匹配 |
| **生态优势** | 各交易所官方/社区 SDK 对 Node.js 支持最好 |
#### 子模块划分
| 子模块 | 职责 | 关键技术点 |
| ---------------- | -------------------------------------------------------------------- | ----------------------------------------------- |
| **行情采集器** | 通过 WebSocket 订阅实时行情(ticker、orderbook、trade),定时拉取K线 | `ws`、断线重连、心跳检测、连接池管理 |
| **K 线合成器** | 将实时 tick/trade 流合成为 OHLCV K 线,支持多周期并行、补齐、异常处理 | 时间桶算法、增量更新、多级合成(1m→5m→15m→1h) |
| **数据存储** | K 线数据持久化到 TimescaleDB,订单数据存入 PostgreSQL | TimescaleDB hypertable、批量 UPSERT、列式压缩 |
| **数据清洗** | 去除异常 tick、填充缺失值、处理停盘数据 | 统计异常检测、插值法 |
| **数据发布** | 将处理后的数据通过消息队列推送给 Python 策略引擎 | Redis Pub/Sub、NATS、协议缓冲(Protocol Buffers |
#### 关键接口
```typescript
// src/data/collector.ts
export interface DataFeed {
subscribeTicker(symbols: string[]): Promise<void>;
subscribeOrderbook(symbol: string, depth: number): Promise<void>;
fetchKlines(symbol: string, interval: KlineInterval, limit?: number): Promise<Kline[]>;
}
// src/data/types.ts
export interface Ticker {
exchange: string;
symbol: string;
price: number;
volume: number;
timestamp: number;
}
export interface Kline {
exchange: string;
symbol: string;
interval: KlineInterval;
open: number;
high: number;
low: number;
close: number;
volume: number;
timestamp: number;
}
// src/data/pipeline.ts
export class DataPipeline {
constructor(
private collector: DataFeed,
private klineSynthesizer: KlineSynthesizer,
private storage: DataStorage,
private publisher: DataPublisher,
) {}
async start(): Promise<void> {
// 启动数据流管道
const tickerStream = await this.collector.subscribeTicker(['BTCUSDT', 'ETHUSDT']);
tickerStream
.pipe(this.klineSynthesizer.transform())
.pipe(this.storage.writeStream())
.pipe(this.publisher.publishStream());
}
}
```
---
#### K 线合成器详解
K 线合成器是数据模块中最核心的组件之一,负责将交易所推送的 **实时离散数据** 合成为标准的 **OHLCV K 线**。以下是其完整的工作机制。
##### 1. 输入数据源
合成器接收三种类型的原始数据:
| 数据源 | 交易所推送频率 | 数据量 | 用途 |
|--------|---------------|--------|------|
| **Ticker**(最新成交价) | 100ms~1s/次 | 中等 | 实时价格监控,快速 K 线更新 |
| **Trade**(逐笔成交) | 毫秒级 | 极大 | 精确 K 线合成(推荐) |
| **K 线(原始)** | 1s~1min(交易所合成好的) | 小 | 直接使用,无需合成(最快方案) |
> **推荐方案**:使用 `Trade`(逐笔成交)作为主数据源合成 K 线,精度最高;`Ticker` 作为辅助用于快速预览。
##### 2. 核心算法:时间桶(Time Bucket
```
时间轴(1分钟 K 线为例):
时间桶 [t0, t0+1min)
┌─────────────────────────────────────────┐
│ Trade A Trade B Trade C │
│ p: 50000 p: 50020 p: 50010 │
│ v: 0.5 v: 1.2 v: 0.8 │
│ t: 00:10 t: 00:25 t: 00:45 │
│ │
│ OHLCV = { O: 50000, H: 50020, │
│ L: 50000, C: 50010, │
│ V: 2.5 } │
└─────────────────────────────────────────┘
```
**算法步骤**
```
每收到一条 Trade/Ticker
1. 计算时间桶索引:bucketIndex = floor(timestamp / interval_ms)
2. 如果 bucketIndex != currentBucketIndex:
a. 关闭当前桶,emit K 线
b. 创建新桶,初始化 O=H=L=C=当前价格, V=当前成交量
3. 否则(仍在当前桶内):
a. O = 不变(桶内第一条的价格)
b. H = max(H, 当前价格)
c. L = min(L, 当前价格)
d. C = 当前价格
e. V += 当前成交量
```
##### 3. 多周期并行合成
系统需要同时维护多个周期的 K 线(1m、5m、15m、1h...),有两种实现策略:
**策略 A:独立合成(每个周期独立计算)** ✅ 推荐
```
Trade 流 ──► 1m 合成器 ──► 1m K 线
├──► 5m 合成器 ──► 5m K 线
├──► 15m 合成器 ─► 15m K 线
└──► 1h 合成器 ──► 1h K 线
```
- 每个周期维护独立的时间桶
- 优点:实现简单,各周期互不影响
- 缺点:内存占用随周期数量线性增长
**策略 B:多级合成(从低周期聚合为高周期)**
```
Trade 流 ──► 1m 合成器 ──► 5m 合成器 ──► 15m 合成器 ──► 1h 合成器
↑ 由 5 根 1m 合成 ↑ 由 3 根 5m 合成
```
- 高周期 K 线由低周期 K 线聚合而成
- 优点:内存效率高,无需为每个周期维护独立的 Trade 缓存
- 缺点:低周期 K 线未完成时,高周期无法产生完整 K 线
**混合策略(推荐)**
```
Trade 流 ──► 1m 合成器(实时,数据源为 Trade)
1m K 线 ──► 5m 聚合器(由 5 根 1m 聚合)
5m K 线 ──► 15m 聚合器
15m K 线 ──► 1h 聚合器
```
1m 用 Trade 实时合成(精度最高),5m 及以上从 1m 聚合(计算量最小)。
##### 4. K 线补齐与边界处理
这是合成器最容易被忽视但极其重要的部分:
| 场景 | 问题 | 处理方案 |
|------|------|----------|
| **无成交周期** | 某个 1m 区间内没有任何 Trade | 使用前一根 K 线的 Close 作为当前 K 线的 OHLC(平盘),Volume=0 |
| **交易所暂停** | 交易所临时停盘或维护 | 停止 emit 新 K 线,标记为 `gap: true`,策略端跳过 |
| **周期边界错位** | 交易所 1h K 线从整点开始,本地合成可能有时区偏移 | 使用 UTC 时间统一对齐,`bucketIndex = floor(utcTimestamp / intervalMs) * intervalMs` |
| **首根 K 线不完整** | 系统启动时间不在周期起点,第一根 K 线数据不足 | emit 时标记 `isPartial: true`,策略端可选择忽略或使用 |
| **数据延迟/乱序** | Trade 到达顺序与发生顺序不一致 | 缓冲区保留 100ms 的排序窗口,按 timestamp 排序后处理 |
##### 5. 增量更新 vs 完整替换
```typescript
// 增量更新模式(推荐)—— 只推送变化部分,大幅减少数据量
interface KlineDelta {
exchange: string;
symbol: string;
interval: KlineInterval;
bucketTime: number; // 时间桶起始时间戳(毫秒)
o: number; // Open(只在开桶时更新)
h: number; // High
l: number; // Low
c: number; // Close
v: number; // Volume
isClosed: boolean; // 当前桶是否已关闭(不再变化)
tradeCount: number; // 桶内的成交笔数
}
// 完整 K 线(用于回测和历史数据)
interface Kline {
exchange: string;
symbol: string;
interval: KlineInterval;
openTime: number;
closeTime: number;
open: number;
high: number;
low: number;
close: number;
volume: number;
quoteVolume: number; // 成交额
takerBuyBaseVolume: number; // 主动买入量
takerBuyQuoteVolume: number; // 主动买入额
tradeCount: number; // 成交笔数
}
```
##### 6. 完整实现示例
```typescript
// src/data/pipeline/kline-synthesizer.ts
import { Subject, interval } from 'rxjs';
import { bufferTime, filter, map } from 'rxjs/operators';
type KlineInterval = '1m' | '5m' | '15m' | '30m' | '1h' | '4h' | '1d';
const INTERVAL_MS: Record<KlineInterval, number> = {
'1m': 60_000,
'5m': 300_000,
'15m': 900_000,
'30m': 1_800_000,
'1h': 3_600_000,
'4h': 14_400_000,
'1d': 86_400_000,
};
interface Trade {
price: number;
amount: number;
timestamp: number;
}
interface KlineBucket {
openTime: number;
open: number;
high: number;
low: number;
close: number;
volume: number;
tradeCount: number;
isClosed: boolean;
}
export class KlineSynthesizer {
private buckets = new Map<string, KlineBucket>();
/**
* 核心合成方法:输入一条 Trade,输出(可能)闭合的 K 线
*/
synthesize(
symbol: string,
interval: KlineInterval,
trade: Trade,
): KlineBucket | null {
const intervalMs = INTERVAL_MS[interval];
const bucketTime =
Math.floor(trade.timestamp / intervalMs) * intervalMs;
const key = `${symbol}:${interval}:${bucketTime}`;
let bucket = this.buckets.get(key);
if (!bucket) {
// 新时间桶:初始化
bucket = {
openTime: bucketTime,
open: trade.price,
high: trade.price,
low: trade.price,
close: trade.price,
volume: trade.amount,
tradeCount: 1,
isClosed: false,
};
this.buckets.set(key, bucket);
return null; // 新桶第一条,暂不 emit
}
// 更新当前桶
bucket.high = Math.max(bucket.high, trade.price);
bucket.low = Math.min(bucket.low, trade.price);
bucket.close = trade.price;
bucket.volume += trade.amount;
bucket.tradeCount += 1;
return null; // 桶未关闭
}
/**
* 关闭过期桶(由定时器调用),返回所有已关闭的 K 线
*/
closeExpiredBuckets(now: number): KlineBucket[] {
const closed: KlineBucket[] = [];
for (const [key, bucket] of this.buckets.entries()) {
if (!bucket.isClosed && now >= bucket.openTime + INTERVAL_MS['1m']) {
bucket.isClosed = true;
closed.push(bucket);
this.buckets.delete(key);
}
}
return closed;
}
/**
* RxJS 操作符:将 Trade 流转换为 K 线流
*/
transform(interval: KlineInterval) {
return (source: Subject<Trade>) => {
const output = new Subject<KlineBucket>();
// 每笔 Trade 更新桶
source.subscribe((trade) => {
this.synthesize('BTCUSDT', interval, trade);
});
// 定时检查过期桶(每秒一次)
interval(1000).subscribe(() => {
const closed = this.closeExpiredBuckets(Date.now());
closed.forEach((k) => output.next(k));
});
return output;
};
}
}
```
##### 7. 性能考量
| 指标 | 数据量 | 说明 |
|------|--------|------|
| **单币种 Trade/秒** | ~50~200 tps | BTCUSDT 高峰期 |
| **单币种 Ticker/秒** | ~10 tps | Binance WebSocket 推送频率 |
| **内存开销/币种** | ~1~5 MB | 100 个并行时间桶 |
| **单笔合成耗时** | < 1 μs | Node.js V8 优化后 |
| **10 币种 5 周期并行** | ~500 μs/轮 | 全部更新一次 |
> **优化建议**:对于高频币种(BTC/ETH),可以采用 Trade 合成 + 1m K 线直接复用交易所推送的原始 K 线(无需自行合成)。
##### 8. 与交易所 K 线的差异处理
| 差异项 | 交易所推送 K 线 | 本地合成 K 线 |
|--------|---------------|-------------|
| **延迟** | 通常延迟 1~5 秒 | 实时(Trade 到达即更新) |
| **精度** | 包含所有成交数据 | 取决于 WebSocket 推送的 Trade 覆盖率 |
| **完整性** | 最终数据,不可变 | 可能因断线导致缺失(需补齐) |
| **自定义周期** | 仅支持标准周期 | 支持任意周期(如 3m、7m、自定义) |
| **多交易所对齐** | 各交易所时间不同 | 使用 UTC 统一对齐 |
> **最佳实践**:本地合成 K 线用于 **实时策略决策**,交易所 K 线用于 **回测和历史分析**。两者结合使用。
---
#### TimescaleDB 数据存储方案
K 线数据具有典型的时序特征:**持续写入、按时间范围查询、少有更新、数据量大**。TimescaleDB 作为 PostgreSQL 的时序扩展,相比 InfluxDB 有以下核心优势:
| 对比维度 | TimescaleDB | InfluxDB |
|---------|------------|----------|
| **基础数据库** | PostgreSQL 扩展 | 独立时序数据库 |
| **SQL 兼容** | 完整 SQL 支持(JOIN、子查询、窗口函数) | 类 SQL 但不完全兼容 |
| **与业务数据关联** | K 线表和订单表可以在同一个 PG 实例中 JOIN | 需要跨数据库查询 |
| **数据压缩** | 列式压缩,压缩比 90%+ | 压缩比约 85% |
| **连续聚合** | 内置自动刷新物化视图 | 需要外部工具 |
| **保留策略** | 内置自动删除(数据保留策略) | 内置 |
| **生态集成** | 可与 PostGIS、pgvector 等 PG 扩展共存 | 独立生态 |
| **运维成本** | 复用 PG 运维经验 | 需独立运维 |
##### 1. 表结构设计
```sql
-- 创建 TimescaleDB 扩展
CREATE EXTENSION IF NOT EXISTS timescaledb;
CREATE EXTENSION IF NOT EXISTS timescaledb_toolkit;
-- ============================================
-- 1. K 线主表(hypertable
-- ============================================
CREATE TABLE klines (
time TIMESTAMPTZ NOT NULL, -- K 线开盘时间
exchange TEXT NOT NULL, -- 交易所 (binance/okx/bybit)
symbol TEXT NOT NULL, -- 交易对 (BTCUSDT/ETHUSDT)
interval TEXT NOT NULL, -- 周期 (1m/5m/15m/1h/4h/1d)
-- OHLCV
open NUMERIC(20,8) NOT NULL,
high NUMERIC(20,8) NOT NULL,
low NUMERIC(20,8) NOT NULL,
close NUMERIC(20,8) NOT NULL,
volume NUMERIC(20,8) NOT NULL,
-- 扩展字段
quote_volume NUMERIC(20,8), -- 成交额(计价币种)
taker_buy_base_vol NUMERIC(20,8), -- 主动买入成交量
taker_buy_quote_vol NUMERIC(20,8), -- 主动买入成交额
trade_count INTEGER, -- 成交笔数
is_closed BOOLEAN DEFAULT TRUE, -- K 线是否已关闭
-- 元数据
created_at TIMESTAMPTZ DEFAULT NOW(), -- 记录创建时间
updated_at TIMESTAMPTZ DEFAULT NOW(), -- 最后更新时间
-- 分区键
-- time 是分区列,symbol + interval 是分布列
UNIQUE (time, exchange, symbol, interval)
);
-- 转换为 hypertable(按时间和空间分区)
SELECT create_hypertable(
'klines',
'time', -- 时间分区列
chunk_time_interval => INTERVAL '1 day', -- 每分区 1 天数据
partitioning_column => 'exchange', -- 空间分区列
number_partitions => 4, -- 4 个空间分区
if_not_exists => TRUE
);
-- ============================================
-- 2. 索引设计
-- ============================================
-- 查询最频繁:按交易对+周期+时间范围查
CREATE INDEX idx_klines_lookup
ON klines (exchange, symbol, interval, time DESC);
-- 回测查询:按交易对+周期+时间范围
CREATE INDEX idx_klines_backtest
ON klines (symbol, interval, time ASC);
-- 最新 K 线查询
CREATE INDEX idx_klines_latest
ON klines (exchange, symbol, interval, time DESC)
WHERE is_closed = TRUE;
-- ============================================
-- 3. 数据压缩策略
-- ============================================
-- 启用压缩(已关闭的 K 线自动压缩)
ALTER TABLE klines SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'exchange, symbol, interval',
timescaledb.compress_orderby = 'time DESC'
);
-- 自动压缩策略:K 线关闭 7 天后压缩
SELECT add_compression_policy('klines', INTERVAL '7 days', if_not_exists => TRUE);
-- ============================================
-- 4. 数据保留策略
-- ============================================
-- 1m K 线保留 30 天
SELECT add_retention_policy('klines', INTERVAL '30 days', if_not_exists => TRUE);
```
##### 2. 连续聚合(Continuous Aggregates
连续聚合是 TimescaleDB 最强大的功能之一 —— 自动从低周期 K 线聚合为高周期 K 线,无需手动维护。
```sql
-- ============================================
-- 5m K 线(从 1m 自动聚合)
-- ============================================
CREATE MATERIALIZED VIEW klines_5m
WITH (timescaledb.continuous) AS
SELECT
time_bucket('5 minutes', time) AS time,
exchange,
symbol,
'5m'::TEXT AS interval,
FIRST(open, time) AS open,
MAX(high) AS high,
MIN(low) AS low,
LAST(close, time) AS close,
SUM(volume) AS volume,
SUM(quote_volume) AS quote_volume,
SUM(taker_buy_base_vol) AS taker_buy_base_vol,
SUM(taker_buy_quote_vol) AS taker_buy_quote_vol,
SUM(trade_count) AS trade_count
FROM klines
WHERE interval = '1m'
GROUP BY time_bucket('5 minutes', time), exchange, symbol;
-- 自动刷新策略(每 1 分钟刷新最近 10 分钟数据)
SELECT add_continuous_aggregate_policy('klines_5m',
start_offset => INTERVAL '1 day',
end_offset => INTERVAL '10 minutes', -- 留 10 分钟给延迟数据
schedule_interval => INTERVAL '1 minute',
if_not_exists => TRUE
);
-- ============================================
-- 15m K 线
-- ============================================
CREATE MATERIALIZED VIEW klines_15m
WITH (timescaledb.continuous) AS
SELECT
time_bucket('15 minutes', time) AS time,
exchange, symbol, '15m'::TEXT AS interval,
FIRST(open, time), MAX(high), MIN(low), LAST(close, time),
SUM(volume), SUM(quote_volume),
SUM(taker_buy_base_vol), SUM(taker_buy_quote_vol),
SUM(trade_count)
FROM klines WHERE interval = '1m'
GROUP BY time_bucket('15 minutes', time), exchange, symbol;
SELECT add_continuous_aggregate_policy('klines_15m',
start_offset => INTERVAL '2 days',
end_offset => INTERVAL '30 minutes',
schedule_interval => INTERVAL '5 minutes',
if_not_exists => TRUE
);
-- ============================================
-- 1h K 线
-- ============================================
CREATE MATERIALIZED VIEW klines_1h
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS time,
exchange, symbol, '1h'::TEXT AS interval,
FIRST(open, time), MAX(high), MIN(low), LAST(close, time),
SUM(volume), SUM(quote_volume),
SUM(taker_buy_base_vol), SUM(taker_buy_quote_vol),
SUM(trade_count)
FROM klines WHERE interval = '1m'
GROUP BY time_bucket('1 hour', time), exchange, symbol;
SELECT add_continuous_aggregate_policy('klines_1h',
start_offset => INTERVAL '3 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '5 minutes',
if_not_exists => TRUE
);
-- ============================================
-- 1d K 线(含周线、月线可在查询时用 time_bucket
-- ============================================
CREATE MATERIALIZED VIEW klines_1d
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 day', time) AS time,
exchange, symbol, '1d'::TEXT AS interval,
FIRST(open, time), MAX(high), MIN(low), LAST(close, time),
SUM(volume), SUM(quote_volume),
SUM(taker_buy_base_vol), SUM(taker_buy_quote_vol),
SUM(trade_count)
FROM klines WHERE interval = '1m'
GROUP BY time_bucket('1 day', time), exchange, symbol;
SELECT add_continuous_aggregate_policy('klines_1d',
start_offset => INTERVAL '7 days',
end_offset => INTERVAL '2 hours',
schedule_interval => INTERVAL '1 hour',
if_not_exists => TRUE
);
```
**查询示例**
```sql
-- 查询 BTC 最近 100 根 1h K 线(自动走连续聚合)
SELECT * FROM klines_1h
WHERE symbol = 'BTCUSDT'
AND exchange = 'binance'
ORDER BY time DESC
LIMIT 100;
-- 查询 BTC 1h K 线的周线
SELECT
time_bucket('1 week', time) AS week,
FIRST(open, time), MAX(high), MIN(low), LAST(close, time),
SUM(volume)
FROM klines_1h
WHERE symbol = 'BTCUSDT'
GROUP BY week
ORDER BY week DESC;
-- 统计最近 7 天各币种波动率
SELECT
symbol,
MAX(high) / MIN(low) - 1 AS volatility
FROM klines_1h
WHERE time > NOW() - INTERVAL '7 days'
GROUP BY symbol
ORDER BY volatility DESC;
```
##### 3. 空间与时间分区原理
```
hypertable: klines
┌─────────────────────────────────────────────────────┐
│ 时间线 │
│ │
│ chunk_1 (2024-01-01) chunk_2 (2024-01-02) ... │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ exchange=A │ │ exchange=A │ │
│ │ exchange=B │ │ exchange=B │ │
│ │ exchange=C │ │ exchange=C │ │
│ │ exchange=D │ │ exchange=D │ │
│ └──────────────────┘ └──────────────────┘ │
│ │
│ 每个 chunk 内部按 exchange 做空间分区 │
│ chunk 大小 ≈ 1 天数据(自动调整) │
└─────────────────────────────────────────────────────┘
```
| 参数 | 值 | 说明 |
|------|----|------|
| `chunk_time_interval` | `1 day` | 每个 chunk 存储 1 天数据,约 100~500MB |
| `number_partitions` | 4 | 按交易所分区,4 个空间分区 |
| 空间分区列 | `exchange` | Binance/OKX/Bybit 等分布在不同分区 |
##### 4. TypeScript 写入实现
```typescript
// data/src/storage/timescaledb.ts
import { Pool } from 'pg';
interface KlineRecord {
time: Date;
exchange: string;
symbol: string;
interval: string;
open: number;
high: number;
low: number;
close: number;
volume: number;
quoteVolume: number;
takerBuyBaseVol: number;
takerBuyQuoteVol: number;
tradeCount: number;
isClosed: boolean;
}
export class TimescaleDBStorage {
private pool: Pool;
private batchBuffer: KlineRecord[] = [];
private batchSize: number;
private flushInterval: number;
constructor(config: {
connectionString: string;
batchSize?: number; // 批量写入条数,默认 500
flushIntervalMs?: number; // 最大等待时间,默认 1000ms
}) {
this.pool = new Pool({ connectionString: config.connectionString });
this.batchSize = config.batchSize ?? 500;
this.flushInterval = config.flushIntervalMs ?? 1000;
// 定时刷新缓冲区
setInterval(() => this.flush(), this.flushInterval);
}
/**
* 写入单条 K 线(加入缓冲区,批量写入)
*/
async write(kline: KlineRecord): Promise<void> {
this.batchBuffer.push(kline);
if (this.batchBuffer.length >= this.batchSize) {
await this.flush();
}
}
/**
* 批量刷新缓冲区
*/
async flush(): Promise<void> {
if (this.batchBuffer.length === 0) return;
const batch = this.batchBuffer.splice(0, this.batchSize);
const client = await this.pool.connect();
try {
// 使用 UNLOGGED 表写入时跳过 WAL 日志(提升写入性能)
await client.query('SET LOCAL timescaledb.enable_skip_scan = ON');
// 批量 UPSERT:已存在的 K 线只更新 close/high/low(增量更新)
const values = batch.map((k, i) => {
const offset = i * 14;
return `($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4},
$${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8},
$${offset + 9}, $${offset + 10}, $${offset + 11}, $${offset + 12},
$${offset + 13}, $${offset + 14})`;
}).join(',');
const params = batch.flatMap(k => [
k.time, k.exchange, k.symbol, k.interval,
k.open, k.high, k.low, k.close,
k.volume, k.quoteVolume,
k.takerBuyBaseVol, k.takerBuyQuoteVol,
k.tradeCount, k.isClosed,
]);
await client.query(`
INSERT INTO klines (
time, exchange, symbol, interval,
open, high, low, close,
volume, quote_volume,
taker_buy_base_vol, taker_buy_quote_vol,
trade_count, is_closed
) VALUES ${values}
ON CONFLICT (time, exchange, symbol, interval) DO UPDATE SET
high = GREATEST(klines.high, EXCLUDED.high),
low = LEAST(klines.low, EXCLUDED.low),
close = EXCLUDED.close,
volume = klines.volume + EXCLUDED.volume,
trade_count = klines.trade_count + EXCLUDED.trade_count,
is_closed = EXCLUDED.is_closed,
updated_at = NOW()
`, params);
} finally {
client.release();
}
}
/**
* 查询 K 线
*/
async query(options: {
exchange: string;
symbol: string;
interval: string;
startTime: Date;
endTime: Date;
limit?: number;
}): Promise<KlineRecord[]> {
const result = await this.pool.query(`
SELECT * FROM klines
WHERE exchange = $1
AND symbol = $2
AND interval = $3
AND time >= $4
AND time <= $5
ORDER BY time ASC
LIMIT $6
`, [
options.exchange,
options.symbol,
options.interval,
options.startTime,
options.endTime,
options.limit ?? 1000,
]);
return result.rows;
}
async close(): Promise<void> {
await this.flush();
await this.pool.end();
}
}
```
##### 5. 写入性能优化策略
| 策略 | 说明 | 效果 |
|------|------|------|
| **批量写入** | 每 500 条或 1 秒批量写入一次 | 减少 99% 的数据库连接开销 |
| **UPSERT** | 用 `ON CONFLICT DO UPDATE` 处理同一 K 线的增量更新 | 避免重复写入 |
| **连接池** | `pg.Pool` 管理 10~20 个连接 | 控制并发,避免 PG 连接暴涨 |
| **压缩** | 关闭 7 天后自动压缩 | 存储空间减少 90%+ |
| **chunk 对齐** | chunk 按天分区,避免跨分区查询 | 查询性能提升 10x |
| **索引** | 精确的复合索引覆盖所有查询模式 | 避免全表扫描 |
##### 6. Python 读取实现(策略引擎侧)
```python
# engine/common/storage.py
import asyncpg
from datetime import datetime
from typing import Optional
class TimescaleDBReader:
"""Python 策略引擎侧的 K 线读取器"""
def __init__(self, dsn: str):
self.dsn = dsn
self.pool: Optional[asyncpg.Pool] = None
async def connect(self):
self.pool = await asyncpg.create_pool(
self.dsn,
min_size=5,
max_size=20,
command_timeout=10,
)
async def get_klines(
self,
symbol: str,
interval: str,
start_time: datetime,
end_time: datetime,
exchange: str = "binance",
limit: int = 1000,
) -> list[dict]:
"""
获取 K 线数据(自动路由到连续聚合视图)
当 interval >= '1h' 时,自动使用连续聚合视图查询,
性能远优于扫描原始 1m 表。
"""
# 自动路由到对应的物化视图
view_map = {
"5m": "klines_5m", "15m": "klines_15m",
"30m": "klines_30m", "1h": "klines_1h",
"4h": "klines_4h", "1d": "klines_1d",
}
table = view_map.get(interval, "klines")
async with self.pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT
time,
open,
high,
low,
close,
volume,
quote_volume,
trade_count
FROM {table}
WHERE exchange = $1
AND symbol = $2
AND interval = $3
AND time >= $4
AND time <= $5
ORDER BY time ASC
LIMIT $6
""",
exchange, symbol, interval,
start_time, end_time, limit,
)
return [dict(row) for row in rows]
async def get_latest_kline(
self,
symbol: str,
interval: str,
exchange: str = "binance",
) -> Optional[dict]:
"""获取最新一根 K 线"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT * FROM klines
WHERE exchange = $1
AND symbol = $2
AND interval = $3
AND is_closed = TRUE
ORDER BY time DESC
LIMIT 1
""",
exchange, symbol, interval,
)
return dict(row) if row else None
async def get_ohlcv_batch(
self,
symbol: str,
interval: str,
limit: int = 200,
exchange: str = "binance",
) -> list[tuple]:
"""
获取 OHLCV 数组(直接用于 pandas DataFrame 或 TA-Lib 计算)
返回格式: [(timestamp, open, high, low, close, volume), ...]
"""
view_map = {
"5m": "klines_5m", "15m": "klines_15m",
"1h": "klines_1h", "4h": "klines_4h", "1d": "klines_1d",
}
table = view_map.get(interval, "klines")
async with self.pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT time, open, high, low, close, volume
FROM {table}
WHERE exchange = $1 AND symbol = $2 AND interval = $3
ORDER BY time DESC
LIMIT $4
""",
exchange, symbol, interval, limit,
)
# 正序返回
return [
(row['time'], row['open'], row['high'],
row['low'], row['close'], row['volume'])
for row in reversed(rows)
]
async def close(self):
if self.pool:
await self.pool.close()
```
##### 7. 磁盘空间估算
```sql
-- 单币种单交易所 1m K 线行数
-- 1天 = 1440 行,1月 ≈ 43200 行,1年 ≈ 525600 行
-- 无压缩时:
-- 1 行 ≈ 180 bytes
-- 10 币种 × 1 年 × 180 B = 945 MB
-- 启用压缩后(压缩比约 92%):
-- 10 币种 × 1 年 × 180 B × 8% ≈ 75 MB
-- 50 币种 × 1 年 ≈ 375 MB
```
| 数据量 | 未压缩 | 压缩后 (92%) | 连续聚合额外 |
|--------|--------|-------------|------------|
| 10 币种 / 1 年 | ~945 MB | ~75 MB | ~30 MB |
| 50 币种 / 1 年 | ~4.7 GB | ~375 MB | ~150 MB |
| 200 币种 / 1 年 | ~18.9 GB | ~1.5 GB | ~600 MB |
##### 8. Docker Compose 配置
```yaml
# docker-compose.yml
version: '3.8'
services:
timescaledb:
image: timescale/timescaledb:2-pg16
container_name: trade-timescaledb
restart: unless-stopped
ports:
- "5432:5432"
environment:
POSTGRES_DB: trade
POSTGRES_USER: trader
POSTGRES_PASSWORD: ${DB_PASSWORD:-changeme}
volumes:
- timescaledb-data:/var/lib/postgresql/data
- ./data/init-db:/docker-entrypoint-initdb.d # 自动执行建表 SQL
command: >
-c shared_buffers=1GB
-c effective_cache_size=3GB
-c maintenance_work_mem=256MB
-c work_mem=64MB
-c wal_buffers=64MB
-c random_page_cost=1.1
-c effective_io_concurrency=200
-c max_connections=50
healthcheck:
test: ["CMD-SHELL", "pg_isready -U trader -d trade"]
interval: 10s
timeout: 5s
retries: 5
volumes:
timescaledb-data:
```
> **初始化脚本**:将上述建表 SQL 保存为 `data/init-db/001_init.sql`,容器首次启动时自动执行。
---
#### 推荐的 npm 包
| 包名 | 用途 |
|------|------|
| `ws` | WebSocket 客户端(轻量、高性能) |
| `ccxt` | 统一交易所 API(支持 100+ 交易所) |
| `pg` | PostgreSQL 客户端(TimescaleDB 兼容) |
| `ioredis` | Redis 客户端(支持集群、哨兵) |
| `zod` | 运行时数据校验,与 Python Pydantic 对应 |
| `pino` | 高性能结构化日志 |
| `rxjs` | 响应式编程,优雅处理数据流 |
| `bull` / `bullmq` | Redis 任务队列,用于定时采集任务 |
| `technicalindicators` | 技术指标计算(备选) |
---
### 2. 策略引擎 (`engine/`) — 🐍 Python
核心调度中心,负责策略的生命周期管理和信号分发。
#### 子模块划分
| 子模块 | 职责 | 关键技术点 |
| -------------- | ---------------------------------------------- | -------------------------------------- |
| **通用模块** | 策略基类、数据模型、日志、配置 | `engine/common/` 目录,基础类型定义 |
| **策略管理器** | 策略注册、启动、停止、热加载 | 插件化架构、动态导入、`importlib` |
| **信号分发器** | 将策略产生的交易信号分发到交易执行模块 | 事件总线、消息队列 |
| **回测引擎** | 使用历史数据模拟策略执行,评估收益、回撤等指标 | `vectorbt` / `backtrader`、事件驱动 |
| **参数优化器** | 网格搜索 / 贝叶斯优化策略参数 | `scipy.optimize``optuna`、并行计算 |
#### 策略基类设计
```python
from common import BaseStrategy, StrategyConfig, Signal
from common.models import Kline, Ticker, OrderBook
class MyStrategy(BaseStrategy):
"""策略示例 — 所有策略继承 BaseStrategy"""
strategy_type = "my_strategy"
async def on_kline(self, kline: Kline) -> Signal | None:
...
```
---
### 3. 交易执行模块 (`executor/`) — 🐍 Python
负责将策略信号转化为实际订单,管理交易所连接和仓位。虽然数据采集在 TypeScript 层完成,但下单操作仍需在 Python 侧执行,因为策略引擎和风控逻辑同在 Python 侧,减少跨语言通信延迟。
#### 子模块划分
| 子模块 | 职责 | 关键技术点 |
| ---------------- | ------------------------------------------------------ | ---------------------------------------- |
| **交易所适配器** | 封装不同交易所的 REST / WebSocket API,提供统一接口 | 适配器模式、限频控制、签名算法 |
| **订单管理器** | 下单、撤单、查询订单状态;支持限价单、市价单、条件单 | 订单状态机、幂等性、超时处理 |
| **仓位管理器** | 跟踪当前持仓、可用余额、未实现盈亏 | 实时同步、增量更新 |
| **执行算法** | TWAP、VWAP、冰山订单等高级执行算法(降低滑点) | 切片算法、时间加权 |
#### 交易所适配器接口
```python
class ExchangeAdapter(ABC):
"""交易所统一适配器接口"""
@abstractmethod
async def create_order(self, order: Order) -> OrderResult:
...
@abstractmethod
async def cancel_order(self, order_id: str) -> bool:
...
@abstractmethod
async def get_balance(self) -> dict[str, float]:
...
@abstractmethod
async def get_position(self, symbol: str) -> Position:
...
```
---
### 4. 风控模块 (`risk/`) — 🐍 Python
系统的安全防线,在交易前、交易中、交易后全链路控制风险。
| 功能 | 说明 |
| ------------------ | ------------------------------------------------------------------ |
| **资金管理** | 单笔下单量限制、总仓位比例限制、逐仓/全仓模式支持 |
| **最大回撤控制** | 当日/累计回撤超过阈值时自动暂停策略 |
| **异常检测** | 检测价格异常波动、交易所 API 异常、网络延迟过高 |
| **熔断机制** | 极端行情下自动停止所有交易,进入只平不开模式 |
| **订单频率限制** | 控制单位时间内的下单次数,防止过度交易 |
| **白名单/黑名单** | 限制可交易的币种和交易对 |
```python
class RiskManager:
async def check_order(self, order: Order, context: TradingContext) -> RiskResult:
"""下单前风控检查"""
checks = [
self._check_max_position,
self._check_order_value,
self._check_order_frequency,
self._check_drawdown_limit,
self._check_price_deviation,
]
for check in checks:
result = await check(order, context)
if not result.passed:
return result
return RiskResult(passed=True)
```
---
### 5. 监控告警模块 (`monitor/`)
保障系统运行的可观测性。
| 功能 | 说明 |
| ---------------- | -------------------------------------------------- |
| **指标收集** | 收集策略收益、胜率、夏普比率、最大回撤等绩效指标 |
| **系统监控** | CPU / 内存 / 网络延迟 / API 调用量 |
| **告警通知** | 通过钉钉、Telegram、飞书等发送告警 |
| **日志管理** | 结构化日志存储,支持按级别、模块检索 |
| **Web 仪表盘** | 实时展示资金曲线、持仓、交易记录、策略状态等 |
---
### 6. API 网关 (`api/`)
对外提供统一接口,支持浏览器端和移动端访问。
| 功能 | 说明 |
| ------------------ | ----------------------------------- |
| **REST API** | 策略管理、查询持仓、查看交易记录 |
| **WebSocket** | 实时推送行情、交易信号和通知 |
| **用户认证** | JWT 认证、API Key 管理 |
| **权限管理** | 多用户角色(管理员 / 只读用户) |
---
### 7. 配置与工具模块 (`common/`)
提供跨模块共享的基础设施。
| 模块 | 功能 |
| -------- | ---------------------------------------- |
| **配置** | 基于 `pydantic-settings` 的环境配置管理 |
| **日志** | 基于 `loguru` 的统一日志配置 |
| **工具** | 时间工具、重试装饰器、限流器、加解密工具 |
| **模型** | 共享的 Pydantic 数据模型(订单、K 线等) |
| **常量** | 交易所枚举、订单类型、时间周期等定义 |
---
## TypeScript 数据模块详解
### 核心架构
```
┌─────────────────────────────────────────────────────────────┐
│ TypeScript 数据模块 │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Binance │ │ OKX │ │ Bybit │ │
│ │ WS Adapter │ │ WS Adapter │ │ WS Adapter │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ 统一行情流 (RxJS Observable) │ │
│ │ ┌───────────┐ ┌──────────┐ ┌─────────────────┐ │ │
│ │ │ ticker$ │ │ trade$ │ │ orderbook$ │ │ │
│ │ └─────┬─────┘ └────┬─────┘ └───────┬─────────┘ │ │
│ └────────┼──────────────┼────────────────┼─────────────┘ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ K 线合成器 (时间桶算法) │ │
│ │ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ │
│ │ │ 1m │ │ 5m │ │ 15m │ │ 1h │ ... │ │
│ │ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ │ │
│ └────────┼────────┼────────┼────────┼─────────────────┘ │
│ ▼ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ 数据清洗 & 异常检测 │ │
│ └───────────────┬──────────────────┬───────────────────┘ │
│ ▼ ▼ │
│ ┌─────────────────────┐ ┌─────────────────────────┐ │
│ │ TimescaleDB 写入器 │ │ Redis Pub/Sub 发布者 │ │
│ └─────────────────────┘ └───────────┬─────────────┘ │
└────────────────────────────────────────┼───────────────────┘
┌────────────────────┐
│ Python 策略引擎 │
│ (消费实时行情) │
└────────────────────┘
```
### 数据流生命周期
```
交易所 WS ──► TypeScript 采集器 ──► RxJS 管道
┌───────────┴───────────┐
│ │
▼ ▼
TimescaleDB 持久化 Redis Pub/Sub
│ │
▼ ▼
Grafana 查询 Python 策略消费
(历史分析) (实时交易)
```
### TypeScript 数据模块目录结构
```
data/
├── package.json
├── tsconfig.json
├── vitest.config.ts
├── src/
│ ├── index.ts # 模块入口
│ ├── config.ts # 配置(基于 zod 校验)
│ ├── logger.ts # 日志(pino
│ │
│ ├── exchanges/ # 交易所适配器
│ │ ├── types.ts # 交易所统一接口
│ │ ├── base.ts # 抽象基类
│ │ ├── binance.ts # Binance 实现
│ │ ├── okx.ts # OKX 实现
│ │ └── bybit.ts # Bybit 实现
│ │
│ ├── pipeline/ # 数据管道
│ │ ├── kline-synthesizer.ts # K 线合成器
│ │ ├── cleaner.ts # 数据清洗
│ │ └── transformer.ts # 数据转换
│ │
│ ├── storage/ # 数据存储
│ │ ├── timescaledb.ts # TimescaleDB 写入器
│ │ └── postgres.ts # PostgreSQL 客户端(业务数据)
│ │
│ ├── publisher/ # 数据发布
│ │ ├── redis.ts # Redis Pub/Sub
│ │ └── nats.ts # NATS(可选)
│ │
│ ├── types/ # 类型定义
│ │ ├── market.ts # Ticker, Kline, Trade…
│ │ ├── exchange.ts # 交易所枚举
│ │ └── enums.ts # 常量枚举
│ │
│ └── utils/ # 工具函数
│ ├── retry.ts # 重试逻辑
│ ├── rate-limiter.ts # 限频器
│ └── time.ts # 时间工具
└── tests/
├── exchanges/
├── pipeline/
└── storage/
```
### Python ↔ TypeScript 跨语言通信
```
┌──────────────────┐ Redis/NATS ┌──────────────────┐
│ 🟦 TypeScript │ ──────────────────────────► │ 🐍 Python │
│ 数据模块 │ Pub: "market:ticker" │ 策略引擎 │
│ │ Pub: "market:kline:1m" │ │
│ │ Pub: "market:trade" │ Sub: 消费行情 │
└──────────────────┘ └──────────────────┘
│ │
│ Redis Pub/Sub 频道设计 │
│ ───────────────────── │
│ market:ticker:{exchange}:{symbol} │
│ market:kline:{exchange}:{symbol}:{interval} │
│ market:trade:{exchange}:{symbol} │
│ system:heartbeat:{module} │
│ system:error:{module} │
│ │
└─────────────────────────────────────────────────┘
```
**数据序列化格式**:推荐使用 **Protocol Buffers****MessagePack**,比 JSON 更小、更快:
```typescript
// TypeScript 侧 - 发布
const data = { symbol: 'BTCUSDT', price: 50000, timestamp: Date.now() };
const encoded = msgpack.encode(data); // 比 JSON 小 30-50%
await redis.publish('market:ticker:binance:BTCUSDT', encoded);
```
```python
# Python 侧 - 消费
import msgpack
async with redis.pubsub() as pubsub:
await pubsub.subscribe('market:ticker:binance:BTCUSDT')
async for msg in pubsub.listen():
if msg['type'] == 'message':
data = msgpack.unpackb(msg['data'])
await strategy.on_ticker(Ticker(**data))
```
---
## 开发步骤
### 第一阶段:基础建设(第 1-2 周)
**目标**:搭建项目骨架,完成 TypeScript 数据模块的基础能力和 Python 项目基础。
| 步骤 | 任务 | 产出物 |
| ---- | ------------------------------------------------------------ | ----------------------------------- |
| 1.1 | 初始化 TypeScript 数据项目(`data/`),配置 `package.json` | 项目结构,ESLint + Prettier 配置 |
| 1.2 | 初始化 Python 项目,配置 `poetry` / `uv` 依赖管理 | `pyproject.toml`,项目目录结构 |
| 1.3 | 定义共享类型:TypeScript `types/` + Python `engine/common/models.py` | 双端对齐的数据模型 |
| 1.4 | 实现 TS 交易所适配器基类 + Binance 适配器(`data/src/exchanges/` | 统一接口 + WebSocket 连接 |
| 1.5 | 实现 TS 行情采集器,WebSocket 订阅实时 ticker 和 K 线 | 实时行情流入 |
| 1.6 | 实现 TS K 线合成器(`data/src/pipeline/kline-synthesizer.ts`) | 多周期 K 线实时合成 |
| 1.7 | 实现 TS 数据存储模块,写入 TimescaleDB | 数据持久化 |
| 1.8 | 实现 TS → Redis 数据发布 | 实时行情推送到消息队列 |
| 1.9 | Python 侧实现配置管理 + 日志 + Redis 订阅消费者 | 消费端就绪 |
| 1.10 | Docker Compose 编排基础服务(TimescaleDB / Redis | `docker-compose.yml` |
### 第二阶段:策略与回测(第 3-4 周)
**目标**:实现策略框架和回测引擎,能编写策略并进行回测验证。
| 步骤 | 任务 | 产出物 |
| ---- | ------------------------------------------------------------ | -------------------------------- |
| 2.1 | 实现策略基类(`engine/common/base.py`) | `BaseStrategy` 抽象基类 |
| 2.2 | 实现策略管理器(`engine/manager.py`),支持策略注册和生命周期 | 策略热加载、启动/停止控制 |
| 2.3 | 实现信号分发器(`engine/signals.py`) | 事件总线,策略到执行器的信号传递 |
| 2.4 | 实现回测引擎(`engine/backtest.py`) | 历史数据回测,收益曲线、回撤等指标 |
| 2.5 | 实现技术指标计算(`data/indicators.py`) | MA、MACD、RSI 等常用指标 |
| 2.6 | 编写示例策略 1:双均线交叉策略 | 可运行的策略示例 |
| 2.7 | 编写示例策略 2:网格交易策略 | 可运行的策略示例 |
| 2.8 | 实现参数优化器(`engine/optimizer.py`) | 基于 Optuna 的参数搜索 |
### 第三阶段:交易执行与风控(第 5-6 周)
**目标**:连接实盘交易通道,实现完整的交易执行流程和风控体系。
| 步骤 | 任务 | 产出物 |
| ---- | ------------------------------------------------------------ | --------------------------------- |
| 3.1 | 实现订单管理器(`executor/order_manager.py`) | 下单、撤单、订单状态更新 |
| 3.2 | 实现仓位管理器(`executor/position_manager.py`) | 实时仓位跟踪 |
| 3.3 | 实现交易状态机(`executor/state_machine.py`) | 订单生命周期管理(Created→Filled |
| 3.4 | 实现重试与容错机制(`executor/retry.py`) | 网络异常自动重试、幂等性保证 |
| 3.5 | 实现风控管理器(`risk/manager.py`) | 资金管理、仓位限制、熔断逻辑 |
| 3.6 | 实现风控规则引擎(`risk/rules.py`) | 可扩展的风控规则链 |
| 3.7 | 整合 TS 数据模块 → Redis → Python 策略引擎 → 交易执行的端到端流程 | 完整的跨语言运行流水线 |
| 3.8 | 编写集成测试,模拟实盘环境验证 | 测试覆盖率报告 |
### 第四阶段:API 与监控(第 7-8 周)
**目标**:完善系统的可观测性和对外接口。
| 步骤 | 任务 | 产出物 |
| ---- | ------------------------------------------------------------ | ----------------------------------- |
| 4.1 | 搭建 FastAPI 项目(`api/main.py`) | REST API 服务启动 |
| 4.2 | 实现 WebSocket 实时推送(`api/ws.py`) | 行情和交易数据实时推送 |
| 4.3 | 实现策略管理 API`api/routes/strategies.py`) | 策略启停、参数修改接口 |
| 4.4 | 实现交易记录查询 API(`api/routes/trades.py`) | 交易历史查询 |
| 4.5 | 集成 Prometheus 指标(`monitor/metrics.py`) | 关键业务指标暴露 |
| 4.6 | 配置 Grafana 仪表盘 | 可视化资金曲线、交易频率、策略绩效 |
| 4.7 | 实现告警通知集成(`monitor/alerts.py`) | Telegram / 钉钉通知 |
| 4.8 | JWT 用户认证与 API Key 管理 | 安全访问控制 |
### 第五阶段:优化与生产化(第 9-10 周)
**目标**:性能优化,系统加固,上线准备。
| 步骤 | 任务 | 产出物 |
| ---- | ------------------------------------------------------------ | -------------------------------- |
| 5.1 | 性能优化:数据库批量写入、连接池调优、TS/Python 异步性能分析 | 性能测试报告 |
| 5.2 | 增加单元测试与集成测试,目标覆盖率 > 80% | 测试报告 |
| 5.3 | 编写部署文档(`docs/deployment.md`) | 部署手册 |
| 5.4 | 编写使用文档(`docs/usage.md`) | 用户手册 |
| 5.5 | CI/CD 流水线配置(GitHub Actions — TS 和 Python 双管道 | 自动化测试、构建、部署 |
| 5.6 | 压力测试与稳定性测试 | 压测报告 |
| 5.7 | 生产环境部署与监控上線 | 线上系统 |
---
## 项目目录结构
```
trade/
├── pyproject.toml # Python 项目依赖与配置(Poetry / uv
├── package.json # TS 项目根依赖(可选 monorepo 管理)
├── docker-compose.yml # 基础服务编排
├── Dockerfile # 应用容器化
├── .env.example # 环境变量模板
├── .gitignore
├── README.md
├── config/ # Python 配置文件
│ ├── __init__.py
│ ├── settings.py # pydantic-settings 全局配置
│ └── strategies/ # 策略配置文件
│ ├── ma_cross.yaml
│ └── grid_trading.yaml
├── data/ # 🟦 TypeScript 数据模块
│ ├── package.json
│ ├── tsconfig.json
│ ├── vitest.config.ts
│ ├── src/
│ │ ├── index.ts # 模块入口
│ │ ├── config.ts # 配置
│ │ ├── exchanges/ # 交易所适配器
│ │ │ ├── types.ts
│ │ │ ├── base.ts
│ │ │ ├── binance.ts
│ │ │ ├── okx.ts
│ │ │ └── bybit.ts
│ │ ├── pipeline/ # 数据管道
│ │ │ ├── kline-synthesizer.ts
│ │ │ ├── cleaner.ts
│ │ │ └── transformer.ts
│ │ ├── storage/ # 数据存储
│ │ │ ├── timescaledb.ts # TimescaleDB K 线写入
│ │ │ └── postgres.ts # PostgreSQL 其他数据
│ │ ├── publisher/ # 数据发布
│ │ │ ├── redis.ts
│ │ │ └── nats.ts
│ │ ├── types/ # 类型定义
│ │ │ ├── market.ts
│ │ │ ├── exchange.ts
│ │ │ └── enums.ts
│ │ └── utils/
│ │ ├── retry.ts
│ │ ├── rate-limiter.ts
│ │ └── time.ts
│ └── tests/
├── engine/ # 🐍 Python 策略引擎
│ ├── __init__.py
│ ├── env.yaml # 引擎环境配置
│ ├── common/ # 引擎通用模块
│ │ ├── __init__.py
│ │ ├── base.py # BaseStrategy 基类
│ │ ├── models.py # 数据模型(Kline/Ticker/Trade/OrderBook
│ │ └── logger.py # 结构化日志
│ ├── manager.py # 策略管理器
│ ├── signals.py # 信号分发器
│ ├── backtest.py # 回测引擎
│ └── optimizer.py # 参数优化器
├── executor/ # 🐍 Python 交易执行模块
│ ├── __init__.py
│ ├── exchange.py # 交易所适配器(基类+实现)
│ ├── order_manager.py # 订单管理器
│ ├── position_manager.py # 仓位管理器
│ ├── state_machine.py # 订单状态机
│ └── retry.py # 重试与容错
├── risk/ # 🐍 Python 风控模块
│ ├── __init__.py
│ ├── manager.py # 风控管理器
│ └── rules.py # 风控规则
├── monitor/ # 监控告警模块
│ ├── __init__.py
│ ├── metrics.py # Prometheus 指标
│ ├── alerts.py # 告警通知
│ └── dashboard.py # 仪表盘数据聚合
├── api/ # API 网关
│ ├── __init__.py
│ ├── main.py # FastAPI 入口
│ ├── ws.py # WebSocket 处理
│ ├── auth.py # 用户认证
│ └── routes/ # 路由
│ ├── __init__.py
│ ├── strategies.py
│ ├── trades.py
│ └── account.py
├── strategies/ # 🐍 Python 策略实现
│ ├── __init__.py
│ ├── ma_cross.py # 双均线交叉策略
│ ├── grid_trading.py # 网格交易策略
│ └── arbitrage.py # 套利策略(可选)
├── common/ # 🐍 Python 公共基础设施(跨模块共享配置、工具等)
│ ├── __init__.py
│ ├── config.py # 全局配置
│ ├── constants.py # 常量定义
│ └── utils.py # 工具函数
├── tests/ # 🐍 Python 测试
│ ├── __init__.py
│ ├── data/
│ ├── engine/
│ ├── executor/
│ ├── risk/
│ └── conftest.py # pytest fixtures
└── docs/ # 文档
├── deployment.md
├── usage.md
└── api_reference.md
```
---
## 部署与运维
### 服务组件依赖
```
┌──────────────────┐ ┌──────────────┐ ┌─────────────┐
│ TS 数据模块 │ │ PostgreSQL │ │ Grafana │
│ (Bun 进程) │ │ (业务数据) │ │ (可视化) │
└──────┬───────────┘ └──────────────┘ └──────┬──────┘
│ │
│ ┌──────────────┐ │
├────────────▶ TimescaleDB │◀────────────┤
│ │ (时序存储) │ │
│ └──────────────┘ │
│ │
│ ┌──────────────┐ │
├────────────▶ Redis ├─────────────┤
│ │ (队列/缓存) │ │
│ └──────┬───────┘ │
│ │ │
│ ┌──────▼───────┐ │
└────────────▶ Python 引擎 │ │
│ (策略+执行) │ │
└──────────────┘ │
```
### 容器化方案
```dockerfile
# Dockerfile.data — TypeScript 数据模块
FROM node:20-alpine AS builder
WORKDIR /app
COPY data/package*.json ./
RUN npm ci
COPY data/ .
RUN npm run build
FROM node:20-alpine AS runner
WORKDIR /app
COPY --from=builder /app/dist ./dist
COPY --from=builder /app/node_modules ./node_modules
CMD ["node", "dist/index.js"]
```
```dockerfile
# Dockerfile.engine — Python 业务引擎
FROM python:3.11-slim
WORKDIR /app
COPY pyproject.toml poetry.lock ./
RUN pip install poetry && poetry install --no-dev
COPY . .
CMD ["poetry", "run", "python", "-m", "engine.main"]
```
### 关键运维指标
- **API 延迟**:单次下单 < 500ms(网络延迟不计)
- **数据延迟**:行情从交易所到 Redis < 500msTypeScript 采集)
- **系统可用性**:核心交易链路 > 99.9%
- **策略执行周期**:秒级 tick 策略 < 100ms / 次
- **跨语言通信延迟**Redis Pub/Sub < 1ms(同机部署)
---
## 注意事项与风险提示
> ⚠️ **风险声明**:数字货币交易具有高风险,本系统仅为技术实现,不构成任何投资建议。使用前请充分了解风险。
### 技术注意事项
1. **API 限频**:各交易所均有严格 API 限频,需实现本地限流器
2. **WebSocket 重连**:网络波动导致断线,需实现心跳检测和自动重连(指数退避策略)
3. **数据一致性**:订单状态需轮询确认,避免因异步回调丢失状态
4. **时间同步**:服务器需 NTP 同步,防止时间偏差导致下单失败
5. **日志安全**:避免记录 API Secret 等敏感信息到日志
6. **灰度发布**:新策略先在模拟盘运行,验证通过后方可接入实盘
7. **资金隔离**:不同策略的资金账户严格隔离,防止交叉影响
8. **跨语言类型一致性**TypeScript 的 `zod` schema 和 Python 的 `Pydantic` model 需保持同步,建议用自动化脚本生成或共享 schema 文件
### 开发建议
1. **模块化开发**:各模块独立开发、独立测试,通过接口契约协作
2. **代码质量**TS 用 `ESLint` + `Prettier`Python 用 `ruff` + `mypy`
3. **版本控制**:配置文件和策略参数不应硬编码,使用 `.env` 或配置文件
4. **日志先行**:先写好日志,再写业务逻辑,便于调试
5. **单元测试**:核心逻辑(风控、订单状态机、K 线合成器)必须有单元测试覆盖
6. **模拟盘先行**:实盘前至少 2 周模拟盘验证策略稳定性
7. **本地开发**Python 和 TS 模块分别 `npm run dev` / `poetry run dev` 独立开发调试
8. **schema 驱动**:考虑使用 Protocol Buffers 的 `.proto` 文件同时生成 TS 和 Python 代码,保证类型完全一致
---
## 许可证
MIT License