- aggregate.ts: openTime % ms === 0 → (openTime + 60_000) % ms === 0 确保桶内最后一根1m K线到达(桶已闭合)时才触发聚合刷新 - AGENTS.md: 新增 K线 time 字段与 time_bucket 边界说明 - ARCHITECTURE.md: 新增 §4.3 聚合时间边界与刷新时机 小节 - websocket-realtime.md: 同步更新 checkAndRefresh 判定公式和表格
48 KiB
Trade Data Module — 技术架构说明
模块定位:数字货币量化交易系统的数据层,负责多交易所行情采集、K 线合成、时序数据持久化与实时发布。
运行时:Bun | 语言:TypeScript 5.x | 数据库:PostgreSQL + TimescaleDB | ORM:TypeORM + @timescaledb/typeorm
目录
- 技术选型与依赖矩阵
- 整体架构
- 目录结构
- 核心模块设计
- 数据流生命周期
- TypeORM + TimescaleDB 集成细节
- 配置管理策略
- 日志与可观测性
- 错误处理与容错
- 性能考量
- 开发工作流
- 风险提示
1. 技术选型与依赖矩阵
1.1 为什么选择 Bun + TypeScript?
| 决策点 | 选型 | 理由 |
|---|---|---|
| 运行时 | Bun | 原生支持 TypeScript,零配置运行 .ts 文件;启动速度比 Node.js 快 4×;兼容 Node.js ecosystem |
| 语言 | TypeScript 5.x | 编译期类型检查,在数据管道中消除 undefined / 类型不匹配导致的运行时崩溃 |
| 包管理 | bun install |
与 Bun 运行时一体,安装速度比 npm 快 25× |
| 测试 | Vitest | 原生 ESM 支持,与 Bun 兼容良好(已有 4 个测试文件,覆盖率建设中) |
1.2 依赖矩阵
| 类别 | 依赖包 | 版本 | 用途 | 状态 |
|---|---|---|---|---|
| ORM 框架 | typeorm |
^1.0 | 关系数据(交易对配置、交易所元数据)的 ORM 映射 | ✅ 使用中 |
| 时序 ORM | @timescaledb/typeorm |
^0.0.1 | Hypertable 实体装饰器(仅实体定义,不负责 DDL 迁移) | ✅ 使用中 |
| PG 驱动 | pg |
^8.21 | TypeORM 底层 PG 连接 | ✅ 使用中 |
| 交易所 SDK | binance |
^3.5 | Binance 官方 SDK:MainClient(现货)、USDMClient(合约)、WebSocket | ✅ 使用中 |
| 日志 | pino |
^10.3 | 高性能结构化日志(Bun 下吞吐 > 100k/s) | ✅ 使用中 |
| YAML 解析 | yaml |
^2.9 | 解析项目根目录 env.yaml 配置文件 | ✅ 使用中 |
| 消息总线 | node:events |
内建 | EventEmitter 类型安全封装(utils/bus.ts),模块间解耦通信 |
✅ 使用中 |
| Redis | ioredis |
^5.11 | Pub/Sub 行情发布(设计阶段,尚未集成) | ⏳ 远期 |
| 统一交易所 | ccxt |
^4.5 | 多交易所统一 API(设计阶段,当前仅 Binance) | ⏳ 远期 |
| 通用 WS | ws |
^8.21 | 非 Binance 交易所 WebSocket(设计阶段) | ⏳ 远期 |
1.3 TypeORM + TimescaleDB 分工边界
┌─────────────────────────────────────────────────────────────┐
│ PostgreSQL 实例 │
│ │
│ ┌─────────────────────────────────────┐ │
│ │ TypeORM 管理域(关系数据) │ │
│ │ · exchanges(交易所配置) │ │
│ │ · trading_pairs(交易对配置) │ │
│ │ → Migration: init-db SQL 脚本 │ │
│ └─────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────┐ │
│ │ @timescaledb/typeorm 管理域(时序) │ │
│ │ · klines(1m K 线 hypertable) │ │
│ │ · klines_3m~1mon(12 层连续聚合视图) │ │
│ │ → DDL: init-db SQL 脚本 │ │
│ │ → 实体: @Hypertable 装饰器仅用于 │ │
│ │ TypeORM 运行时映射,不负责建表 │ │
│ └─────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
设计原则:TypeORM 管理结构稳定的关系数据;TimescaleDB hypertable 和连续聚合视图通过
db/init-db/SQL 脚本手动管理(synchronize: false)。@timescaledb/typeorm仅用于实体装饰器(@Hypertable/@TimeColumn),不参与 DDL 迁移。
2. 整体架构
┌──────────────────────────────────────────────────────────────────┐
│ Trade Data Module (Bun + TS) │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Binance REST │── backfill ──────→│ TimescaleDB │ │
│ │ (官方 SDK) │ │ Klines (1m) │ │
│ │ · MainClient │ └────────┬─────────┘ │
│ │ · USDMClient │ │ │
│ └──────────────────┘ │ │
│ │ │
│ ┌──────────────────┐ Bus Events (node:events) │ │
│ │ Binance WS │── kline:update ────→ upsert─┘ │
│ │ (官方 SDK) │ ↓ │
│ └────────┬─────────┘ kline:saved ────→ 连续聚合刷新 │
│ │ │
│ │ pair:ready ← backfill 完成后触发 │
│ │ ws:disconnected / ws:connected / ws:stale │
│ │ │
│ ┌────────┴──────────────────────────┐ │
│ │ run/start.ts(编排层) │ │
│ │ 注册 bus 监听 → backfillKlines() │ │
│ │ → WS 订阅 → 入库 → 聚合刷新 │ │
│ └───────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────┐ │
│ │ TypeORM 实体(关系数据) │ │
│ │ Exchange / TradingPair │ │
│ └───────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────┐ │
│ │ rest-registry.ts │ │
│ │ fetchKlines() 根据 type 自动路由 │ │
│ │ spot → binance, um/cm → futures │ │
│ └───────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
3. 目录结构
data/
├── ARCHITECTURE.md # ← 本文件:技术架构说明
├── package.json # 依赖与脚本
├── tsconfig.json # TypeScript 配置
├── bun.lock # Bun 依赖锁定文件
│
├── run/ # 启动文件
│ ├── start.ts # 模块入口:注册 bus 事件监听 → 回补 → WS 订阅 → 入库 → 聚合刷新 → 优雅关闭
│ └── exchange.ts # 数据补全入口:委托 backfillKlines() 逐交易对回补
│
├── config/ # 中心化配置模块
│ ├── index.ts # 配置加载与分组导出(pgsql / redis / exchange / logging)
│ └── validators.ts # 零依赖运行时校验(env.yaml → EnvConfig)
│
├── db/ # 数据库层
│ ├── data-source.ts # TypeORM DataSource 配置(synchronize: false)
│ ├── entities/ # ORM 实体
│ │ ├── index.ts # 实体统一导出
│ │ ├── common.entity.ts # 公共基类(UUID 主键 + created_at/updated_at)
│ │ ├── exchange.entity.ts # 交易所(binance/okx/bybit)
│ │ ├── trading-pair.entity.ts # 交易对(含 type 列区分 spot/um/cm)
│ │ └── kline.entity.ts # TimescaleDB K 线 hypertable(1m,5 列复合 PK)
│ └── init-db/ # 数据库初始化 SQL(DDL、连续聚合视图、种子数据)
│
├── exchanges/ # 交易所适配器
│ ├── base.ts # BaseRestClient / BaseWsClient 抽象基类
│ ├── constants.ts # KLINE_INTERVAL_MS 等常量
│ ├── index.ts # Re-export 入口
│ ├── rest-registry.ts # REST 客户端注册表 + fetchKlines() 自动路由
│ ├── ws-manager.ts # WS 实例池管理 + watchKline/watchKlines
│ └── binance/
│ ├── rest.ts # BinanceRestClient(现货)+ BinanceFuturesRestClient(合约)
│ └── ws.ts # BinanceWsClient(WebSocket 订阅)
│
├── service/ # 业务服务层
│ ├── backfill.ts # 回补编排:遍历交易对,逐对拉取→入库→更新时间戳
│ ├── kline.ts # upsertOrUpdateKlines(批量 UPSERT,5 列冲突键)
│ ├── pair.ts # getAllPairs / getPairLastBackfillTime / updatePairWsTime
│ └── aggregate.ts # checkAndRefresh(连续聚合刷新触发)
│
├── types/ # 共享类型定义
│ ├── base.ts # Kline / Ticker / Trade / FetchKlinesParams 等接口
│ └── kline.ts # PairType / KlineInterval 等枚举
│
├── utils/ # 工具函数
│ ├── index.ts # getNowMinuteMS / getPrevMinuteMS / wait
│ ├── bus.ts # TypedEventBus(node:events 类型安全封装)
│ └── logger.ts # Pino 日志实例
│
└── tests/ # 测试(部分就绪,覆盖率建设中)
4. 核心模块设计
4.1 配置模块 config/
加载流程:
项目根目录 env.yaml → parseYaml() 解析 → validateConfig() 零依赖校验
→ 按职责分组导出: pgsql / redis / logging
| 导出对象 | 字段 | 来源 | 说明 |
|---|---|---|---|
pgsql |
host, port, database, user, password, max, idleTimeoutMillis, connectionTimeoutMillis | env.yaml → 校验 + 硬编码 |
pg Pool 配置,TypeORM DataSource 复用;max/idleTimeoutMillis/connectionTimeoutMillis 硬编码 |
redis |
url, publishEnabled, channelPrefix, retryDelayBaseMs, maxRetries | env.yaml → 校验 + 硬编码 |
ioredis 连接与 Pub/Sub 配置;channelPrefix/retryDelayBaseMs/maxRetries 硬编码 |
logging |
level, nodeEnv, pretty | env.yaml → 校验 |
Pino 日志配置;pretty 由 node_env === "development" 推导 |
设计原则:仅数据库连接、Redis 连接、日志级别等运维配置通过
env.yaml暴露。配置源为项目根目录的env.yaml,TypeScript/Python 模块共享同一份配置。
校验定义位置:config/validators.ts — 零依赖运行时校验(assertString / assertPort / assertBoolean / assertEnum),fail-fast 原则。
配置分组详情:
// —— pgsql: 数据库连接 ——
export const pgsql = {
host: rawConfig.db.host, // env.yaml default: "localhost"
port: rawConfig.db.port, // env.yaml default: 5432
database: rawConfig.db.name, // env.yaml default: "trade"
user: rawConfig.db.user, // env.yaml default: "trader"
password: rawConfig.db.password, // env.yaml required
max: 20, // 硬编码:连接池上限
idleTimeoutMillis: 30000, // 硬编码:空闲超时 30s
connectionTimeoutMillis: 5000, // 硬编码:连接获取超时 5s
} as const;
// —— redis: 消息队列 ——
export const redis = {
url: rawConfig.redis.url, // env.yaml default: "redis://localhost:6379"
publishEnabled: rawConfig.redis.publish_enabled, // env.yaml default: true
channelPrefix: "trade", // 硬编码
retryDelayBaseMs: 1000, // 硬编码
maxRetries: 10, // 硬编码
} as const;
// —— logging: 日志 ——
export const logging = {
level: rawConfig.logging.level, // env.yaml default: "debug"
nodeEnv: rawConfig.logging.node_env, // env.yaml default: "development"
pretty: rawConfig.logging.node_env === "development",
} as const;
4.2 TypeORM 数据源 db/data-source.ts
// data/db/data-source.ts
import { DataSource } from "typeorm";
import { pgsql } from "../config";
import * as entities from "./entities";
export const AppDataSource = new DataSource({
type: "postgres",
host: pgsql.host, port: pgsql.port,
database: pgsql.database,
username: pgsql.user, password: pgsql.password,
// 所有实体(关系 + 时序)通过 db/entities/index.ts 统一注册
entities: [...Object.values(entities)],
synchronize: true, // 开发环境自动同步 schema
migrations: [__dirname + "/migrations/*.{ts,js}"],
extra: {
max: pgsql.max,
idleTimeoutMillis: pgsql.idleTimeoutMillis,
connectionTimeoutMillis: pgsql.connectionTimeoutMillis,
},
});
await AppDataSource.initialize();
实体注册原则
- 关系实体(
Exchange,TradingPair):继承CommonBaseEntity(UUID 主键 + created_at/updated_at),标准@Entity()装饰器 - 时序实体(
Kline):不继承CommonBaseEntity,使用@Hypertable()+@TimeColumn()自动创建 TimescaleDB hypertable - 统一入口:所有实体通过
db/entities/index.ts导出,...Object.values(entities)自动注册
4.3 TimescaleDB K 线实体
唯一 K 线实体:
kline.entity.ts(1m)。高周期 K 线(3m / 5m / 15m / 30m / 1h / 2h / 4h / 6h / 8h / 1d / 1w / 1mon)由 TimescaleDB 连续聚合视图从 1m 基表自动生成,不创建独立实体。聚合刷新由service/aggregate.ts按时间桶对齐触发。
kline.entity.ts — 1m K 线 Hypertable
// data/db/entities/kline.entity.ts
import { Hypertable, TimeColumn } from "@timescaledb/typeorm";
import { Entity, PrimaryColumn, Column, CreateDateColumn, UpdateDateColumn } from "typeorm";
import type { KlineInterval, PairType } from '../../types';
@Hypertable({
compression: {
compress: true,
compress_orderby: "time DESC",
compress_segmentby: "exchange, symbol, type", // ← 含 type
policy: { schedule_interval: "30 days" }, // 30 天后压缩
},
})
@Entity("klines")
export class Kline {
/** 交易所标识(binance) */
@PrimaryColumn("text")
exchange!: string;
/** 交易对符号(如 BTCUSDT) */
@PrimaryColumn("text")
symbol!: string;
/** 交易对类型(spot / um / cm)— 五列复合主键之一 */
@PrimaryColumn("text", { default: 'spot' })
type!: PairType;
/** K 线周期(1m) */
@PrimaryColumn("text")
interval!: KlineInterval;
/** K 线开盘时间(UTC)— TimescaleDB 时间分区键 */
@TimeColumn()
@PrimaryColumn("timestamptz")
time!: Date;
// OHLCV — NUMERIC(20,8)
@Column("numeric", { precision: 20, scale: 8 })
open!: number;
// ... high / low / close / volume 同
// 扩展字段
@Column("numeric", { precision: 20, scale: 8, nullable: true })
quote_volume?: number;
@Column("numeric", { precision: 20, scale: 8, nullable: true })
taker_buy_base_vol?: number;
@Column("numeric", { precision: 20, scale: 8, nullable: true })
taker_buy_quote_vol?: number;
@Column("integer", { nullable: true })
trade_count?: number;
@Column("boolean", { default: true })
is_closed!: boolean;
@CreateDateColumn({ type: "timestamptz" })
createdAt!: Date;
@UpdateDateColumn({ type: "timestamptz" })
updatedAt!: Date;
}
复合主键设计
5 列:(exchange, symbol, type, interval, time)。type 列保证相同 symbol 的现货与合约 K 线不会主键冲突。
BTCUSDT + spot → klines
BTCUSDT + um → klines ← 同一 symbol,不同 type,PK 不冲突
为什么用
type列而非.P后缀?
.P后缀type字段symbol 语义 污染标识符,BTCUSDT.P 不是交易对名称 symbol 与 Binance 官方一致 查询 WHERE symbol LIKE '%.P'不走索引WHERE type = 'um'索引友好API 调用 每次 strip 后缀 symbol 原样传入,零变换 扩展 Coin-M 需要 _PERP新后缀规则加 'cm'枚举值即可
聚合时间边界与刷新时机
time 列语义:存储交易所原始 openTime(K 线开盘时间的 Unix 毫秒时间戳),不使用系统接收时间。这是连续聚合正确性的前提——回补的 3 天前 K 线若用"当前系统时间"写入,time_bucket 会将其分入错误的桶。
time_bucket 左闭右开:time_bucket('5 minutes', time) 将 time 向下取整到 5 分钟边界,桶区间为 [start, start + interval)。
00:00 ─┐
00:01 │
00:02 │ 5m 桶 [00:00, 00:05)
00:03 │ FIRST(open, time) = 00:00 的开盘价
00:04 ─┘ LAST(close, time) = 00:04 的收盘价
00:05 ─┐ 下一个 5m 桶 [00:05, 00:10)
...
聚合刷新触发时机:当桶内最后一根 1m K 线到达(桶已闭合)时触发,而非桶的第一分钟。判定条件 (openTime + 60_000) % period_ms === 0,即当前分钟结束边界刚好整除聚合周期。
| 聚合周期 | 桶范围 | 触发 1m K 线 | 判定 |
|---|---|---|---|
| 5m | [00:00, 00:05) | 00:04 | (00:04 + 60s) % 300s = 0 ✅ |
| 15m | [00:00, 00:15) | 00:14 | (00:14 + 60s) % 900s = 0 ✅ |
| 1h | [00:00, 01:00) | 00:59 | (00:59 + 60s) % 3600s = 0 ✅ |
为什么不是
openTime % period_ms === 0:该条件在桶的第一分钟(如 00:00)触发,此时桶内仅 1 根 K 线,其他 4 根尚未到达,刷出的聚合 K 线不完整。延迟到桶闭合触发可保证聚合数据完整性。
TimescaleDB 配置
| 配置项 | 值 | 说明 |
|---|---|---|
chunk_time_interval |
7 days | ~450 chunks / 1000万行,查询剪枝与元数据管理平衡 |
compress_segmentby |
exchange, symbol, type |
同交易对+类型聚合压缩(interval 固定 1m 无需分段) |
compress_orderby |
time DESC |
查询通常按时间降序 |
compress_after |
30 days | 30 天前数据自动列式压缩(压缩率 ~92%) |
DDL 管理:
synchronize: false,不依赖 TypeORM 自动建表。hypertable 创建、压缩策略、连续聚合视图均由db/init-db/SQL 脚本管理。@timescaledb/typeorm仅用于实体装饰器映射。
4.4 关系数据实体 db/entities/
trading-pair.entity.ts
// data/db/entities/trading-pair.entity.ts
@Entity("trading_pairs")
@Index(["exchange", "symbol", "type"], { unique: true }) // 同交易所下 symbol + type 唯一
@Index(["active"])
export class TradingPair extends CommonBaseEntity {
@ManyToOne(() => Exchange, { nullable: false })
@JoinColumn({ name: "exchange_id" })
exchange!: Exchange;
@Column("varchar", { length: 20 })
symbol!: string; // "BTCUSDT"
@Column("text", { default: 'spot' })
type!: PairType; // "spot" | "um" | "cm" — 区分现货/合约
@Column("varchar", { length: 10 })
base_asset!: string; // "BTC"
@Column("varchar", { length: 10 })
quote_asset!: string; // "USDT"
@Column("integer", { default: 10 })
price_precision!: number;
@Column("integer", { default: 10 })
quantity_precision!: number;
@Column("numeric", { precision: 32, scale: 8, nullable: true })
min_qty?: number;
@Column("numeric", { precision: 32, scale: 8, nullable: true })
step_size?: number;
@Column("numeric", { precision: 32, scale: 8, nullable: true })
min_notional?: number;
@Column("boolean", { default: true })
active!: boolean;
/** 历史 K 线最后补全时间(UTC)。新交易对默认为 epoch 起点,触发全量回补 */
@Column("timestamptz", { default: () => "to_timestamp(0)" })
last_backfill_time!: Date;
/** WebSocket 最近一次写入的 1m K 线 openTime(UTC) */
@Column("timestamptz", { default: () => "to_timestamp(0)" })
last_ws_time!: Date;
@Column("text", { nullable: true })
notes?: string;
}
实体 ER 关系
┌─────────────┐ ┌──────────────────┐
│ Exchange │──1:N──│ TradingPair │
│ │ │ │
│ id (PK) │ │ id (PK) │
│ name (UQ) │ │ exchange_id (FK) │
│ label │ │ symbol │
│ enabled │ │ type ← 区分现货合约│
│ config │ │ base_asset │
└─────────────┘ │ quote_asset │
│ last_backfill_time│
│ last_ws_time │
│ active │
└──────────────────┘
4.5 交易所适配器 exchanges/
REST 客户端
抽象基类 base.ts:提供限流(throttle)和配置注入,子类只需实现 fetchKlines()。
// data/exchanges/base.ts
export abstract class BaseRestClient {
abstract readonly exchange: string;
protected readonly config: RestClientConfig;
protected async throttle(key: string): Promise<void> { /* 冷却节流 */ }
abstract fetchKlines(params: FetchKlinesParams): Promise<Kline[]>;
}
Binance 实现 binance/rest.ts:基于 Binance 官方 SDK(MainClient + USDMClient),convertBinanceKline() 将 SDK 12 元组转为标准化 Kline。
// data/exchanges/binance/rest.ts
import { MainClient, USDMClient } from "binance";
export function convertBinanceKline(
raw: BinanceRestKline, symbol: string, interval: KlineInterval, type: PairType,
): Kline {
return {
exchange: "binance", // 现货和合约统一为 "binance",通过 type 列区分
symbol, type, interval,
openTime, closeTime,
open: String(open), high: String(high), // ... OHLCV 均转 string 保持精度
isClosed: true,
};
}
export class BinanceRestClient extends BaseRestClient {
readonly exchange = "binance";
private client = new MainClient({ api_key, api_secret }, { timeout: 3000 });
async fetchKlines(params) { /* getKlines → convertBinanceKline */ }
}
export class BinanceFuturesRestClient extends BaseRestClient {
readonly exchange = "binance_futures"; // 仅标识用途,入库 exchange 仍为 "binance"
private client = new USDMClient({ api_key, api_secret }, { timeout: 3000 });
async fetchKlines(params) { /* getKlines → convertBinanceKline(同构 12 元组) */ }
}
REST 客户端注册与自动路由 rest-registry.ts
// 注册表:交易所 ID → 客户端构造器
const registry = {
binance: () => new BinanceRestClient(),
binance_futures: () => new BinanceFuturesRestClient(),
};
// fetchKlines() 根据 type 自动路由,调用方零感知
export async function fetchKlines(params: FetchKlinesParams): Promise<Kline[]> {
const exchangeId = params.type === "spot"
? params.exchange // "binance"
: `${params.exchange}_futures`; // "binance_futures"
const client = createRestClient(exchangeId);
return client.fetchKlines(params);
}
设计要点:路由逻辑内聚在
fetchKlines()一处。调用方(backfill.ts、未来其他模块)只需传type字段,无需感知_futures后缀规则。新增 Coin-M('cm')时只需在fetchKlines()中补充一条路由,不影响任何调用方。
WebSocket 客户端
抽象基类 base.ts:
export abstract class BaseWsClient {
abstract watch(symbol: string): void;
abstract unwatch(symbol: string): void;
}
Binance 实现 binance/ws.ts:基于 Binance 官方 WebSocket SDK,收到闭合 K 线后标准化并通过 bus.emit("kline:update") 广播。
WS 实例池管理 ws-manager.ts:按 exchange:type 组合复用 WS 连接(同一 binance:spot 下多个 symbol 共享一个连接),watchKline() 自动去重订阅。
// data/exchanges/ws-manager.ts
const wsInstances = new Map<string, BinanceWsClient>();
function getWsInstance(exchange: string, type: PairType): BinanceWsClient {
const key = `${exchange}:${type}`;
if (!wsInstances.has(key)) {
wsInstances.set(key, new BinanceWsClient(exchange, type));
}
return wsInstances.get(key)!;
}
export function watchKline({ exchange, type, symbol }) {
getWsInstance(exchange, type).watch(symbol);
}
适配器实现矩阵(当前仅 Binance,多交易所为远期):
交易所 REST WS 备注 Binance 现货 BinanceRestClientBinanceWsClient官方 SDK Binance USDT-M BinanceFuturesRestClientBinanceWsClient官方 SDK,exchange 统一为 "binance"OKX — 远期 — — 远期 — Bybit — 远期 — — 远期 —
4.6 数据补全服务
数据补全由 service/backfill.ts 实现,run/exchange.ts 为独立启动入口(委托调用 backfillKlines()),run/start.ts 为完整实时流程入口(回补 → WS 订阅 → 聚合刷新)。
核心机制 — 基于 last_backfill_time 的增量回补
trading_pairs 表:
┌────┬──────────┬──────┬──────────────────────┬──────────────────────┐
│ id │ symbol │ type │ last_backfill_time │ last_ws_time │
├────┼──────────┼──────┼──────────────────────┼──────────────────────┤
│ 1 │ BTCUSDT │ spot │ 2026-01-01 12:00:00 │ 2026-01-01 12:01:00 │
│ 2 │ BTCUSDT │ um │ 2026-01-01 12:00:00 │ 2026-01-01 12:01:00 │
│ 3 │ ETHUSDT │ spot │ 2026-01-01 08:00:00 │ 2026-01-01 08:01:00 │
└────┴──────────┴──────┴──────────────────────┴──────────────────────┘
last_backfill_time初始值为1970-01-01T00:00:00Z(epoch 起点),新交易对自动触发全量回补- 每批拉取后更新为最后一条 K 线的
openTime - 重启时从断点续拉,不重复
回补流程(backfill.ts)
backfillKlines()
→ getAllPairs() 获取所有交易对
→ 逐对调用 backfillKline(pair):
while (lastBackfillTime < getPrevMinuteMS()):
fetchKlines({ exchange, type, symbol, startTime, limit: 1000 })
→ upsertOrUpdateKlines(klines) // 5 列冲突键 UPSERT
→ updatePairLastBackfillTime() // 更新断点
→ 批次间随机延迟 0~1s(防限频)
→ 逐个 emit("pair:ready") / emit("pair:failed")
→ emit("backfill:complete")
- 容错策略:任何异常标记为
error,中断所有回补,由外部进程管理器重启续拉 - 单次请求 10s 超时,防止 SDK 僵死
Bus 事件驱动的启动时序(start.ts)
1. 注册 bus 事件监听
├── pair:ready → watchKline(订阅 WS Kline 流)
├── kline:update → upsertOrUpdateKlines + updatePairWsTime → emit kline:saved
├── kline:saved → checkAndRefresh(连续聚合刷新)
├── backfill:complete → 统计输出 / 失败退出
└── ws:disconnected/connected/stale → 断线重连检测(5 分钟观察窗口)
2. await backfillKlines() — 逐个 pair 回补,自动 emit pair:ready
3. 所有 pair 就绪后,WS 实时行情持续运行
4. SIGTERM/SIGINT → AppDataSource.destroy() → process.exit
# 独立回补(不启动 WS)
bun run data/run/exchange.ts
# 完整实时流程(回补 → WS 订阅 → 聚合刷新)
bun run data/run/start.ts
5. 数据流生命周期
┌──────────────────┐
│ Binance REST │── backfill ────→┌──────────────────┐
│ (官方 SDK) │ │ TimescaleDB │
└──────────────────┘ │ Klines (1m) │
└────────┬─────────┘
┌──────────────────┐ │
│ Binance WS │── kline:update ──────────┤
│ (官方 SDK) │ │ │
└────────┬─────────┘ ▼ │
│ upsertOrUpdateKlines │
│ │ │
│ ▼ ▼
│ 连续聚合刷新 ←── kline:saved
│ (klines_5m/15m/1h/...)
│
│ pair:ready ← backfill 完成后触发
│ ws:disconnected / ws:connected / ws:stale
│
┌────────┴──────────────────────────┐
│ run/start.ts(编排层) │
│ 注册 bus 监听 → backfillKlines() │
│ → WS 订阅 → 入库 → 聚合刷新 │
└───────────────────────────────────┘
┌───────────────────────────────────┐
│ TypeORM 实体(关系数据) │
│ Exchange / TradingPair │
└───────────────────────────────────┘
启动时序
入口文件:run/start.ts(编排层,不直接操作 WS 实例、不写库、不刷新聚合)
1. 模块 import 阶段(隐式初始化)
├── config/index.ts 读取 env.yaml → 零依赖校验
├── utils/logger.ts 初始化 Pino
└── db/data-source.ts TypeORM DataSource.initialize()
└── 连接 PostgreSQL(DDL 由 init-db/ SQL 脚本手动管理)
2. 注册 bus 事件监听(桥接各 service 模块)
├── pair:ready → watchKline(订阅 WS Kline 流)
├── kline:update → upsertOrUpdateKlines + updatePairWsTime → emit kline:saved
├── kline:saved → checkAndRefresh(连续聚合刷新) → emit aggregate:refreshed
├── backfill:complete → 统计输出 / 失败退出
└── ws:disconnected / ws:connected / ws:stale → 断线重连检测(5 分钟观察窗口)
3. 调起 backfillKlines()(逐个 pair 回补,自动 emit pair:ready / backfill:complete)
4. 所有交易对就绪后,WS 实时行情持续运行
5. 注册 SIGTERM/SIGINT → AppDataSource.destroy() → process.exit
6. TypeORM + TimescaleDB 集成细节
6.1 写入路径选择
| 场景 | 方案 | 原因 |
|---|---|---|
| 关系数据 CRUD(交易对配置、交易所) | TypeORM Repository API | 标准 ORM 操作,事务支持,类型安全 |
| K 线批量写入(高频) | TypeORM Repository upsert() + 批量 |
TypeORM 的 upsert 方法支持 ON CONFLICT DO UPDATE |
| K 线海量写入(极端场景) | 裸 pg Pool + INSERT ... ON CONFLICT |
绕过 ORM 开销,直接参数化 SQL,批量 500 条/次 |
// 方案 A: TypeORM Repository upsert(推荐日常使用)
const klineRepo = AppDataSource.getRepository(Kline);
await klineRepo.upsert(batchRecords, {
conflictPaths: ["exchange", "symbol", "type", "interval", "time"],
skipUpdateIfNoValuesChanged: true,
});
// 方案 B: 裸 pg 批量写入(极端性能场景)
import { Pool } from "pg";
const pool = new Pool(pgsql);
await pool.query(`
INSERT INTO klines (...) VALUES ${values}
ON CONFLICT (exchange, symbol, type, interval, time) DO UPDATE SET
high = GREATEST(klines.high, EXCLUDED.high),
low = LEAST(klines.low, EXCLUDED.low),
close = EXCLUDED.close,
volume = klines.volume + EXCLUDED.volume,
updated_at = NOW()
`, params);
6.3 TimescaleDB 特定配置
synchronize: false,DDL 全部由 db/init-db/ SQL 脚本手动管理,不依赖 TypeORM 自动迁移:
-- db/init-db/01-timescaledb.sql
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- db/init-db/02-init-tables.sql
-- CREATE TABLE klines (...) + SELECT create_hypertable(...) + 压缩策略
-- db/init-db/03-continuous-aggregates.sql
-- 12 层连续聚合视图(3m → 5m → 15m → ... → 1mon)
@timescaledb/typeorm 仅用于实体装饰器(@Hypertable / @TimeColumn),让 TypeORM 知道如何映射运行时查询,不参与 DDL 生成。
SQL 初始化脚本执行方式:Docker Compose 挂载 ./data/db/init-db/ 到容器初始化目录,首次启动自动执行。
7. 配置管理策略
7.1 配置文件
项目根目录 env.yaml 为统一环境配置源,TypeScript(data/config/)和 Python 模块共享同一份 YAML。
<project_root>/
├── env.yaml ← 统一配置源(YAML,TS/Python 共享)
└── data/
└── config/
├── index.ts ← 读取 env.yaml → 校验 → 导出分组配置
└── validators.ts ← 零依赖运行时校验(assertString / assertPort / assertEnum)
7.2 env.yaml 配置段
# --- TimescaleDB / PostgreSQL ---
db:
host: localhost
port: 5432
name: trade
user: trader
password: fucketh
# --- Redis ---
redis:
url: redis://localhost:6379
publish_enabled: true
# --- 日志 ---
logging:
level: debug # trace / debug / info / warn / error / fatal
node_env: development # development / production / test
7.3 使用方式
// 任何模块中导入配置(所有导出对象均为强类型 as const)
import { pgsql, redis, logging } from "../config";
// TypeORM DataSource
const ds = new DataSource({ type: "postgres", ...pgsql });
// Redis 客户端
const redisClient = new Redis(redis.url);
// 环境判断
if (logging.nodeEnv === "development") {
logger.info("Config loaded");
}
8. 日志与可观测性
8.1 Pino 日志配置
// data/logger.ts
import pino from "pino";
import { logging } from "../config";
export const logger = pino({
level: logging.level,
// 开发环境:使用 pino-pretty 彩色输出
// 生产环境:JSON 格式,便于 ELK / Loki 采集
...(logging.pretty
? { transport: { target: "pino-pretty", options: { colorize: true } } }
: {}),
// 自动注入模块名
base: { module: "trade-data" },
// 序列化 Error 对象
serializers: {
err: pino.stdSerializers.err,
},
});
8.2 日志规范
| 级别 | 使用场景 | 示例 |
|---|---|---|
trace |
每条 Trade 的详细处理过程 | 开发调试用,生产关闭 |
debug |
批量写入、WS 心跳 | "Kline batch flushed: count=500" |
info |
模块启动/停止、连接建立 | "Binance WS connected: symbols=15" |
warn |
重连、数据异常、限频触发 | "WS reconnecting: attempt=3, delay=12000ms" |
error |
写入失败、WS 不可恢复 | "TimescaleDB write failed: connection refused" |
fatal |
进程即将退出 | "Unrecoverable error, shutting down" |
安全原则:严禁在日志中记录 API Key、Secret、数据库密码等敏感信息。使用
logger.info({ apiKey: "***" })脱敏。
9. 错误处理与容错
9.1 错误分类与策略
| 错误类型 | 处理策略 | 恢复方式 |
|---|---|---|
| WebSocket 断线 | 指数退避重连(基数 3s,上限 10 次) | 自动重连 → 重新订阅 → 补齐缺失数据 |
| 数据库写入失败 | 缓冲重试 3 次 → 丢弃并告警 | 下一批数据正常写入 |
| 数据库连接断开 | pg Pool 自动重连 | TypeORM DataSource 重建 |
| Redis 发布失败 | 静默丢弃(发布是非关键路径) | 下一 tick 继续发布 |
| 配置校验失败 | 进程退出(fail-fast) | 修正 .env 后重启 |
| 交易所 API 限频 | 令牌桶限流,429 响应冷却 60s | 自动恢复 |
9.2 优雅关闭(Graceful Shutdown)
// data/run/start.ts — 实际优雅关闭实现
process.on("SIGTERM", () => {
logger.info("收到 SIGTERM,关闭连接");
AppDataSource.destroy().finally(() => process.exit(0));
});
process.on("SIGINT", () => {
logger.info("收到 SIGINT,关闭连接");
AppDataSource.destroy().finally(() => process.exit(0));
});
当前为最小实现:仅关闭 TypeORM DataSource。WebSocket、Redis 等远期组件将来加入关闭链。
10. 性能考量
10.1 关键性能指标
| 指标 | 目标 | 说明 |
|---|---|---|
| 单笔 Trade 处理延迟 | < 1 μs | 时间桶 HashMap 查找 + 更新 |
| WebSocket 消息吞吐 | > 10,000 msg/s | 单进程处理全部订阅对 |
| 批量写入延迟 | < 100 ms | 500 条批量 UPSERT |
| 内存占用(10 symbol × 5 周期) | < 50 MB | 时间桶缓存 |
| 进程启动时间 | < 3 s | Bun 启动 + TypeORM 初始化 |
10.2 优化策略
| 策略 | 实现 | 收益 |
|---|---|---|
| 批量写入 | 500 条 / 1s 批量刷新 | 减少 99% DB 连接开销 |
| TimescaleDB 压缩 | 7 天后自动列式压缩 | 存储减少 90%+ |
| 连续聚合 | 高周期从低周期视图读取 | 查询快 100×(1h K 线无需扫描 1m 表) |
| 连接池复用 | TypeORM DataSource 管理 20 连接 | 防止连接数暴涨 |
| JSON 序列化 | 生产环境考虑 MessagePack | 体积减少 30-50%,序列化快 2× |
10.3 磁盘空间估算
| 数据量(1m K 线) | 未压缩 | 压缩后(92%) |
|---|---|---|
| 10 币种 × 1 年 | ~945 MB | ~75 MB |
| 50 币种 × 1 年 | ~4.7 GB | ~375 MB |
| 200 币种 × 1 年 | ~18.9 GB | ~1.5 GB |
10.4 负载模型
本模块非 HTTP 服务,单进程单租户运行。以下模型用于评估优化收益和容量规划,避免以"通用库"视角高估问题优先级。
运行模式
| 阶段 | 触发 | 网络特征 | 持续 |
|---|---|---|---|
| 回补 | cron / 手动 | REST API 分页拉取,受 --concurrency 和 200ms 限频约束 |
一次性,时长按数据缺口 |
| 实时采集 | 常驻进程 | WebSocket 订阅,被动接收 | 7×24 |
日志量
日志量由 K 线间隔锁定,不受交易活跃度影响:
日志条数/秒 ≈ 交易对数 / 60
| 交易对数 | 日志条/秒 | resolveCaller CPU 占比(单核) |
|---|---|---|
| 10 | 0.17 | < 0.001% |
| 100 | 1.67 | < 0.01% |
| 500 | 8.33 | < 0.03% |
resolveCaller 的 new Error().stack 开销(~0.3ms/条)在万级日志/秒以下无实际影响,不值得为此牺牲生产排障的 caller 字段。该问题已标记为不修复,详见 docs/tech-debt.md #6。
数据库写入
| 数据类型 | 频率 | 写入方式 |
|---|---|---|
| 1m K 线 | 每交易对每分钟 1 条 | upsertOrUpdateKlines 单条写入 |
| 逐笔成交 (Trade) | 高峰 >100 tps/交易对 | 批量写入(待实现) |
100 交易对场景下 K 线写入量 ≈ 100 条/分钟,不存在写入瓶颈。
内存
| 结构 | 单条估算 | 100 交易对 |
|---|---|---|
| K 线时间桶 (1m) | ~200 bytes/桶 | 100 × 200B = 20 KB |
| 5m/15m/1h/4h/1d 聚合桶 | ~200 bytes/桶/周期 | 100 × 5 × 200B = 100 KB |
总内存占用 < 10 MB(不含 Bun 运行时和 TypeORM 连接池),远低于 §10.1 指标目标(< 50 MB)。
11. 开发工作流
11.1 本地开发
# 1. 启动基础服务
docker compose up -d timescaledb
# 2. 安装依赖
cd data && bun install
# 3. 配置环境
# 编辑项目根目录 env.yaml(如不存在则创建)
# 4. 启动数据模块
bun run run/start.ts
# 5. 运行测试
bun test
11.2 新增交易所适配器
# 1. 创建适配器文件
touch exchanges/new-exchange.ts
# 2. 实现 MarketDataFeed 接口
# 3. 在 types/base.ts 注册类型,在 exchanges/rest-registry.ts 注册适配器(index.ts 为纯 re-export)
# 4. 编写测试 tests/exchanges/new-exchange.test.ts
# 5. 如需在数据库中配置,插入 exchanges 表和 trading_pairs 表
11.3 数据库迁移
# 生成迁移文件(开发环境)
bunx typeorm migration:generate db/migrations/AddNewColumn -d db/data-source.ts
# 执行迁移(生产环境)
bunx typeorm migration:run -d db/data-source.ts
# 回滚最近一次迁移
bunx typeorm migration:revert -d db/data-source.ts
11.4 目录与文件命名规范
| 规范 | 说明 |
|---|---|
| 文件名 | kebab-case.ts |
| 类名 | PascalCase |
| 函数/变量 | camelCase |
| 常量 | UPPER_SNAKE_CASE |
| 类型/接口 | PascalCase |
| 测试文件 | *.test.ts(与源文件同目录或 tests/ 下镜像结构) |
11.5 数据补全
# 独立回补(不启动 WS,逐对拉取→入库→更新时间戳)
bun run data/run/exchange.ts
# 完整实时流程(回补 → WS 订阅 → 聚合刷新)
bun run data/run/start.ts
# 聚合视图刷新
bun run data/run/build_aggregates_sql.ts # dry-run
bun run data/run/build_aggregates_sql.ts --execute # 执行刷新
12. 风险提示
⚠️ 重要声明:本数据模块为量化交易系统的技术基础设施,仅提供数据采集、存储与发布功能。任何基于本系统构建的交易策略,其盈亏风险由使用者自行承担。本系统作者不提供任何形式的投资建议,不对策略收益做任何承诺。
技术风险
| 风险 | 影响 | 缓解措施 |
|---|---|---|
| 交易所 API 变更 | 数据断流 | 适配器模式隔离,单个交易所变更不影响其他 |
| WebSocket 数据丢失 | K 线不完整 | REST 补拉机制 + gap 标记 |
| 数据库写入瓶颈 | 数据积压 | 批量写入 + 连接池 |
| 时钟偏差 | K 线时间戳错位 | NTP 时钟同步 + UTC 统一对齐 |
实盘接入前置条件
当前处于开发阶段,以下条件为远期目标。
- ✅ 模拟盘环境连续运行 ≥ 2 周无致命错误
- ✅ 核心模块单元测试覆盖率 ≥ 80%
- ✅ 数据库写入延迟 p99 < 200ms
- ✅ WebSocket 断线自动恢复成功率 ≥ 99%
- ✅ 日志采集与告警链路就绪
附录 A:docker-compose.yml 数据库配置参考
# docker-compose.yml(项目根目录)
services:
timescaledb:
image: timescale/timescaledb-ha:pg17.10-ts2.27.1
container_name: trade-timescaledb
restart: unless-stopped
ports:
- "5432:5432"
environment:
POSTGRES_DB: trade
POSTGRES_USER: trader
POSTGRES_PASSWORD: ${DB_PASSWORD:-changeme}
volumes:
- ./db/pgsql:/var/lib/postgresql # 数据持久化
- ./data/init-db:/docker-entrypoint-initdb.d # 自动执行建表 SQL(fallback)
command: >
-c shared_buffers=1GB
-c effective_cache_size=3GB
-c maintenance_work_mem=256MB
-c work_mem=64MB
-c wal_buffers=64MB
-c random_page_cost=1.1
-c effective_io_concurrency=200
-c max_connections=50
healthcheck:
test: ["CMD-SHELL", "pg_isready -U trader -d trade"]
interval: 10s
timeout: 5s
retries: 5
附录 B:核心依赖版本锁定
| 包 | 版本 | 说明 |
|---|---|---|
typescript |
^6.0 | 语言编译器 |
bun |
≥1.3 | 运行时 |
typeorm |
^1.0 | 关系数据 ORM |
@timescaledb/typeorm |
^0.0.1 | TimescaleDB TypeORM 扩展(实验性,仅装饰器映射) |
pg |
^8.21 | PostgreSQL 原生驱动 |
binance |
^3.5 | Binance 官方 SDK(MainClient + USDMClient + WebSocket) |
yaml |
^2.9 | YAML 解析(env.yaml) |
pino |
^10.3 | 结构化日志 |
ioredis |
^5.11 | Redis 客户端(远期) |
ccxt |
^4.5 | 多交易所统一 API(远期) |
ws |
^8.21 | 通用 WebSocket 客户端(远期) |