Files
trade/data/ARCHITECTURE.md
Rekey 10e2364cf1 fix(aggregate): 修正聚合刷新触发时机 — 桶闭合时触发而非桶首
- aggregate.ts: openTime % ms === 0 → (openTime + 60_000) % ms === 0
  确保桶内最后一根1m K线到达(桶已闭合)时才触发聚合刷新
- AGENTS.md: 新增 K线 time 字段与 time_bucket 边界说明
- ARCHITECTURE.md: 新增 §4.3 聚合时间边界与刷新时机 小节
- websocket-realtime.md: 同步更新 checkAndRefresh 判定公式和表格
2026-06-19 00:27:02 +08:00

48 KiB
Raw Permalink Blame History

Trade Data Module — 技术架构说明

模块定位:数字货币量化交易系统的数据层,负责多交易所行情采集、K 线合成、时序数据持久化与实时发布。

运行时Bun | 语言TypeScript 5.x | 数据库PostgreSQL + TimescaleDB | ORMTypeORM + @timescaledb/typeorm


目录

  1. 技术选型与依赖矩阵
  2. 整体架构
  3. 目录结构
  4. 核心模块设计
  5. 数据流生命周期
  6. TypeORM + TimescaleDB 集成细节
  7. 配置管理策略
  8. 日志与可观测性
  9. 错误处理与容错
  10. 性能考量
  11. 开发工作流
  12. 风险提示

1. 技术选型与依赖矩阵

1.1 为什么选择 Bun + TypeScript

决策点 选型 理由
运行时 Bun 原生支持 TypeScript,零配置运行 .ts 文件;启动速度比 Node.js 快 4×;兼容 Node.js ecosystem
语言 TypeScript 5.x 编译期类型检查,在数据管道中消除 undefined / 类型不匹配导致的运行时崩溃
包管理 bun install 与 Bun 运行时一体,安装速度比 npm 快 25×
测试 Vitest 原生 ESM 支持,与 Bun 兼容良好(已有 4 个测试文件,覆盖率建设中)

1.2 依赖矩阵

类别 依赖包 版本 用途 状态
ORM 框架 typeorm ^1.0 关系数据(交易对配置、交易所元数据)的 ORM 映射 使用中
时序 ORM @timescaledb/typeorm ^0.0.1 Hypertable 实体装饰器(仅实体定义,不负责 DDL 迁移) 使用中
PG 驱动 pg ^8.21 TypeORM 底层 PG 连接 使用中
交易所 SDK binance ^3.5 Binance 官方 SDKMainClient(现货)、USDMClient(合约)、WebSocket 使用中
日志 pino ^10.3 高性能结构化日志(Bun 下吞吐 > 100k/s 使用中
YAML 解析 yaml ^2.9 解析项目根目录 env.yaml 配置文件 使用中
消息总线 node:events 内建 EventEmitter 类型安全封装(utils/bus.ts),模块间解耦通信 使用中
Redis ioredis ^5.11 Pub/Sub 行情发布(设计阶段,尚未集成) 远期
统一交易所 ccxt ^4.5 多交易所统一 API(设计阶段,当前仅 Binance) 远期
通用 WS ws ^8.21 非 Binance 交易所 WebSocket(设计阶段) 远期

1.3 TypeORM + TimescaleDB 分工边界

┌─────────────────────────────────────────────────────────────┐
│                    PostgreSQL 实例                            │
│                                                              │
│  ┌─────────────────────────────────────┐                    │
│  │        TypeORM 管理域(关系数据)      │                    │
│  │  · exchanges(交易所配置)            │                    │
│  │  · trading_pairs(交易对配置)         │                    │
│  │  → Migration: init-db SQL 脚本        │                    │
│  └─────────────────────────────────────┘                    │
│                                                              │
│  ┌─────────────────────────────────────┐                    │
│  │  @timescaledb/typeorm 管理域(时序)  │                    │
│  │  · klines1m K 线 hypertable       │                    │
│  │  · klines_3m~1mon12 层连续聚合视图) │                    │
│  │  → DDL: init-db SQL 脚本              │                    │
│  │  → 实体: @Hypertable 装饰器仅用于      │                    │
│  │    TypeORM 运行时映射,不负责建表       │                    │
│  └─────────────────────────────────────┘                    │
└─────────────────────────────────────────────────────────────┘

设计原则:TypeORM 管理结构稳定的关系数据;TimescaleDB hypertable 和连续聚合视图通过 db/init-db/ SQL 脚本手动管理(synchronize: false)。@timescaledb/typeorm 仅用于实体装饰器(@Hypertable / @TimeColumn),不参与 DDL 迁移。


2. 整体架构

┌──────────────────────────────────────────────────────────────────┐
│                    Trade Data Module (Bun + TS)                   │
│                                                                   │
│  ┌──────────────────┐                    ┌──────────────────┐    │
│  │  Binance REST    │── backfill ──────→│  TimescaleDB     │    │
│  │  (官方 SDK)      │                    │  Klines (1m)     │    │
│  │  · MainClient    │                    └────────┬─────────┘    │
│  │  · USDMClient    │                             │              │
│  └──────────────────┘                             │              │
│                                                   │              │
│  ┌──────────────────┐    Bus Events (node:events) │              │
│  │  Binance WS      │── kline:update ────→ upsert─┘              │
│  │  (官方 SDK)      │         ↓                                   │
│  └────────┬─────────┘   kline:saved ────→ 连续聚合刷新            │
│           │                                                       │
│           │  pair:ready ← backfill 完成后触发                     │
│           │  ws:disconnected / ws:connected / ws:stale            │
│           │                                                       │
│  ┌────────┴──────────────────────────┐                           │
│  │  run/start.ts(编排层)             │                           │
│  │  注册 bus 监听 → backfillKlines()  │                           │
│  │  → WS 订阅 → 入库 → 聚合刷新        │                           │
│  └───────────────────────────────────┘                           │
│                                                                   │
│  ┌───────────────────────────────────┐                           │
│  │  TypeORM 实体(关系数据)            │                           │
│  │  Exchange / TradingPair           │                           │
│  └───────────────────────────────────┘                           │
│                                                                   │
│  ┌───────────────────────────────────┐                           │
│  │  rest-registry.ts                 │                           │
│  │  fetchKlines() 根据 type 自动路由  │                           │
│  │  spot → binance, um/cm → futures  │                           │
│  └───────────────────────────────────┘                           │
└──────────────────────────────────────────────────────────────────┘

3. 目录结构

data/
├── ARCHITECTURE.md                # ← 本文件:技术架构说明
├── package.json                   # 依赖与脚本
├── tsconfig.json                  # TypeScript 配置
├── bun.lock                       # Bun 依赖锁定文件
│
├── run/                           # 启动文件
│   ├── start.ts                   # 模块入口:注册 bus 事件监听 → 回补 → WS 订阅 → 入库 → 聚合刷新 → 优雅关闭
│   └── exchange.ts                # 数据补全入口:委托 backfillKlines() 逐交易对回补
│
├── config/                        # 中心化配置模块
│   ├── index.ts                   # 配置加载与分组导出(pgsql / redis / exchange / logging
│   └── validators.ts              # 零依赖运行时校验(env.yaml → EnvConfig
│
├── db/                            # 数据库层
│   ├── data-source.ts             # TypeORM DataSource 配置(synchronize: false
│   ├── entities/                  # ORM 实体
│   │   ├── index.ts               # 实体统一导出
│   │   ├── common.entity.ts       # 公共基类(UUID 主键 + created_at/updated_at
│   │   ├── exchange.entity.ts     # 交易所(binance/okx/bybit
│   │   ├── trading-pair.entity.ts # 交易对(含 type 列区分 spot/um/cm
│   │   └── kline.entity.ts        # TimescaleDB K 线 hypertable1m5 列复合 PK
│   └── init-db/                   # 数据库初始化 SQL(DDL、连续聚合视图、种子数据)
│
├── exchanges/                     # 交易所适配器
│   ├── base.ts                    # BaseRestClient / BaseWsClient 抽象基类
│   ├── constants.ts               # KLINE_INTERVAL_MS 等常量
│   ├── index.ts                   # Re-export 入口
│   ├── rest-registry.ts           # REST 客户端注册表 + fetchKlines() 自动路由
│   ├── ws-manager.ts              # WS 实例池管理 + watchKline/watchKlines
│   └── binance/
│       ├── rest.ts                # BinanceRestClient(现货)+ BinanceFuturesRestClient(合约)
│       └── ws.ts                  # BinanceWsClientWebSocket 订阅)
│
├── service/                       # 业务服务层
│   ├── backfill.ts                # 回补编排:遍历交易对,逐对拉取→入库→更新时间戳
│   ├── kline.ts                   # upsertOrUpdateKlines(批量 UPSERT5 列冲突键)
│   ├── pair.ts                    # getAllPairs / getPairLastBackfillTime / updatePairWsTime
│   └── aggregate.ts               # checkAndRefresh(连续聚合刷新触发)
│
├── types/                         # 共享类型定义
│   ├── base.ts                    # Kline / Ticker / Trade / FetchKlinesParams 等接口
│   └── kline.ts                   # PairType / KlineInterval 等枚举
│
├── utils/                         # 工具函数
│   ├── index.ts                   # getNowMinuteMS / getPrevMinuteMS / wait
│   ├── bus.ts                     # TypedEventBusnode:events 类型安全封装)
│   └── logger.ts                  # Pino 日志实例
│
└── tests/                         # 测试(部分就绪,覆盖率建设中)

4. 核心模块设计

4.1 配置模块 config/

加载流程:
  项目根目录 env.yaml → parseYaml() 解析 → validateConfig() 零依赖校验
  → 按职责分组导出: pgsql / redis / logging
导出对象 字段 来源 说明
pgsql host, port, database, user, password, max, idleTimeoutMillis, connectionTimeoutMillis env.yaml → 校验 + 硬编码 pg Pool 配置,TypeORM DataSource 复用;max/idleTimeoutMillis/connectionTimeoutMillis 硬编码
redis url, publishEnabled, channelPrefix, retryDelayBaseMs, maxRetries env.yaml → 校验 + 硬编码 ioredis 连接与 Pub/Sub 配置;channelPrefix/retryDelayBaseMs/maxRetries 硬编码
logging level, nodeEnv, pretty env.yaml → 校验 Pino 日志配置;prettynode_env === "development" 推导

设计原则:仅数据库连接、Redis 连接、日志级别等运维配置通过 env.yaml 暴露。配置源为项目根目录的 env.yamlTypeScript/Python 模块共享同一份配置。

校验定义位置config/validators.ts — 零依赖运行时校验(assertString / assertPort / assertBoolean / assertEnum),fail-fast 原则。

配置分组详情

// —— pgsql: 数据库连接 ——
export const pgsql = {
    host: rawConfig.db.host,            // env.yaml default: "localhost"
    port: rawConfig.db.port,            // env.yaml default: 5432
    database: rawConfig.db.name,        // env.yaml default: "trade"
    user: rawConfig.db.user,            // env.yaml default: "trader"
    password: rawConfig.db.password,    // env.yaml required
    max: 20,                            // 硬编码:连接池上限
    idleTimeoutMillis: 30000,           // 硬编码:空闲超时 30s
    connectionTimeoutMillis: 5000,      // 硬编码:连接获取超时 5s
} as const;

// —— redis: 消息队列 ——
export const redis = {
    url: rawConfig.redis.url,                           // env.yaml default: "redis://localhost:6379"
    publishEnabled: rawConfig.redis.publish_enabled,    // env.yaml default: true
    channelPrefix: "trade",                             // 硬编码
    retryDelayBaseMs: 1000,                             // 硬编码
    maxRetries: 10,                                     // 硬编码
} as const;

// —— logging: 日志 ——
export const logging = {
    level: rawConfig.logging.level,         // env.yaml default: "debug"
    nodeEnv: rawConfig.logging.node_env,    // env.yaml default: "development"
    pretty: rawConfig.logging.node_env === "development",
} as const;

4.2 TypeORM 数据源 db/data-source.ts

// data/db/data-source.ts
import { DataSource } from "typeorm";
import { pgsql } from "../config";
import * as entities from "./entities";

export const AppDataSource = new DataSource({
    type: "postgres",
    host: pgsql.host, port: pgsql.port,
    database: pgsql.database,
    username: pgsql.user, password: pgsql.password,
    // 所有实体(关系 + 时序)通过 db/entities/index.ts 统一注册
    entities: [...Object.values(entities)],
    synchronize: true, // 开发环境自动同步 schema
    migrations: [__dirname + "/migrations/*.{ts,js}"],
    extra: {
        max: pgsql.max,
        idleTimeoutMillis: pgsql.idleTimeoutMillis,
        connectionTimeoutMillis: pgsql.connectionTimeoutMillis,
    },
});
await AppDataSource.initialize();

实体注册原则

  • 关系实体Exchange, TradingPair):继承 CommonBaseEntityUUID 主键 + created_at/updated_at),标准 @Entity() 装饰器
  • 时序实体Kline):不继承 CommonBaseEntity,使用 @Hypertable() + @TimeColumn() 自动创建 TimescaleDB hypertable
  • 统一入口:所有实体通过 db/entities/index.ts 导出,...Object.values(entities) 自动注册

4.3 TimescaleDB K 线实体

唯一 K 线实体kline.entity.ts(1m)。高周期 K 线(3m / 5m / 15m / 30m / 1h / 2h / 4h / 6h / 8h / 1d / 1w / 1mon)由 TimescaleDB 连续聚合视图从 1m 基表自动生成,不创建独立实体。聚合刷新由 service/aggregate.ts 按时间桶对齐触发。

kline.entity.ts — 1m K 线 Hypertable

// data/db/entities/kline.entity.ts
import { Hypertable, TimeColumn } from "@timescaledb/typeorm";
import { Entity, PrimaryColumn, Column, CreateDateColumn, UpdateDateColumn } from "typeorm";
import type { KlineInterval, PairType } from '../../types';

@Hypertable({
    compression: {
        compress: true,
        compress_orderby: "time DESC",
        compress_segmentby: "exchange, symbol, type",  // ← 含 type
        policy: { schedule_interval: "30 days" },       // 30 天后压缩
    },
})
@Entity("klines")
export class Kline {
    /** 交易所标识(binance */
    @PrimaryColumn("text")
    exchange!: string;

    /** 交易对符号(如 BTCUSDT */
    @PrimaryColumn("text")
    symbol!: string;

    /** 交易对类型(spot / um / cm)— 五列复合主键之一 */
    @PrimaryColumn("text", { default: 'spot' })
    type!: PairType;

    /** K 线周期(1m */
    @PrimaryColumn("text")
    interval!: KlineInterval;

    /** K 线开盘时间(UTC)— TimescaleDB 时间分区键 */
    @TimeColumn()
    @PrimaryColumn("timestamptz")
    time!: Date;

    // OHLCV — NUMERIC(20,8)
    @Column("numeric", { precision: 20, scale: 8 })
    open!: number;
    // ... high / low / close / volume 同

    // 扩展字段
    @Column("numeric", { precision: 20, scale: 8, nullable: true })
    quote_volume?: number;
    @Column("numeric", { precision: 20, scale: 8, nullable: true })
    taker_buy_base_vol?: number;
    @Column("numeric", { precision: 20, scale: 8, nullable: true })
    taker_buy_quote_vol?: number;
    @Column("integer", { nullable: true })
    trade_count?: number;
    @Column("boolean", { default: true })
    is_closed!: boolean;

    @CreateDateColumn({ type: "timestamptz" })
    createdAt!: Date;
    @UpdateDateColumn({ type: "timestamptz" })
    updatedAt!: Date;
}

复合主键设计

5 列:(exchange, symbol, type, interval, time)type 列保证相同 symbol 的现货与合约 K 线不会主键冲突。

BTCUSDT + spot  → klines
BTCUSDT + um    → klines  ← 同一 symbol,不同 typePK 不冲突

为什么用 type 列而非 .P 后缀?

.P 后缀 type 字段
symbol 语义 污染标识符,BTCUSDT.P 不是交易对名称 symbol 与 Binance 官方一致
查询 WHERE symbol LIKE '%.P' 不走索引 WHERE type = 'um' 索引友好
API 调用 每次 strip 后缀 symbol 原样传入,零变换
扩展 Coin-M 需要 _PERP 新后缀规则 'cm' 枚举值即可

聚合时间边界与刷新时机

time 列语义:存储交易所原始 openTime(K 线开盘时间的 Unix 毫秒时间戳),不使用系统接收时间。这是连续聚合正确性的前提——回补的 3 天前 K 线若用"当前系统时间"写入,time_bucket 会将其分入错误的桶。

time_bucket 左闭右开time_bucket('5 minutes', time)time 向下取整到 5 分钟边界,桶区间为 [start, start + interval)

00:00 ─┐
00:01  │
00:02  │  5m 桶 [00:00, 00:05)
00:03  │  FIRST(open, time) = 00:00 的开盘价
00:04 ─┘  LAST(close, time)  = 00:04 的收盘价
00:05 ─┐  下一个 5m 桶 [00:05, 00:10)
  ...

聚合刷新触发时机:当桶内最后一根 1m K 线到达(桶已闭合)时触发,而非桶的第一分钟。判定条件 (openTime + 60_000) % period_ms === 0,即当前分钟结束边界刚好整除聚合周期。

聚合周期 桶范围 触发 1m K 线 判定
5m [00:00, 00:05) 00:04 (00:04 + 60s) % 300s = 0
15m [00:00, 00:15) 00:14 (00:14 + 60s) % 900s = 0
1h [00:00, 01:00) 00:59 (00:59 + 60s) % 3600s = 0

为什么不是 openTime % period_ms === 0:该条件在桶的第一分钟(如 00:00)触发,此时桶内仅 1 根 K 线,其他 4 根尚未到达,刷出的聚合 K 线不完整。延迟到桶闭合触发可保证聚合数据完整性。

TimescaleDB 配置

配置项 说明
chunk_time_interval 7 days ~450 chunks / 1000万行,查询剪枝与元数据管理平衡
compress_segmentby exchange, symbol, type 同交易对+类型聚合压缩(interval 固定 1m 无需分段)
compress_orderby time DESC 查询通常按时间降序
compress_after 30 days 30 天前数据自动列式压缩(压缩率 ~92%)

DDL 管理synchronize: false,不依赖 TypeORM 自动建表。hypertable 创建、压缩策略、连续聚合视图均由 db/init-db/ SQL 脚本管理。@timescaledb/typeorm 仅用于实体装饰器映射。


4.4 关系数据实体 db/entities/

trading-pair.entity.ts

// data/db/entities/trading-pair.entity.ts
@Entity("trading_pairs")
@Index(["exchange", "symbol", "type"], { unique: true }) // 同交易所下 symbol + type 唯一
@Index(["active"])
export class TradingPair extends CommonBaseEntity {
    @ManyToOne(() => Exchange, { nullable: false })
    @JoinColumn({ name: "exchange_id" })
    exchange!: Exchange;

    @Column("varchar", { length: 20 })
    symbol!: string;          // "BTCUSDT"

    @Column("text", { default: 'spot' })
    type!: PairType;          // "spot" | "um" | "cm" — 区分现货/合约

    @Column("varchar", { length: 10 })
    base_asset!: string;      // "BTC"

    @Column("varchar", { length: 10 })
    quote_asset!: string;     // "USDT"

    @Column("integer", { default: 10 })
    price_precision!: number;

    @Column("integer", { default: 10 })
    quantity_precision!: number;

    @Column("numeric", { precision: 32, scale: 8, nullable: true })
    min_qty?: number;

    @Column("numeric", { precision: 32, scale: 8, nullable: true })
    step_size?: number;

    @Column("numeric", { precision: 32, scale: 8, nullable: true })
    min_notional?: number;

    @Column("boolean", { default: true })
    active!: boolean;

    /** 历史 K 线最后补全时间(UTC)。新交易对默认为 epoch 起点,触发全量回补 */
    @Column("timestamptz", { default: () => "to_timestamp(0)" })
    last_backfill_time!: Date;

    /** WebSocket 最近一次写入的 1m K 线 openTimeUTC */
    @Column("timestamptz", { default: () => "to_timestamp(0)" })
    last_ws_time!: Date;

    @Column("text", { nullable: true })
    notes?: string;
}

实体 ER 关系

┌─────────────┐       ┌──────────────────┐
│   Exchange   │──1:N──│   TradingPair    │
│              │       │                  │
│ id (PK)      │       │ id (PK)          │
│ name (UQ)    │       │ exchange_id (FK) │
│ label        │       │ symbol           │
│ enabled      │       │ type ← 区分现货合约│
│ config       │       │ base_asset       │
└─────────────┘       │ quote_asset      │
                      │ last_backfill_time│
                      │ last_ws_time      │
                      │ active            │
                      └──────────────────┘

4.5 交易所适配器 exchanges/

REST 客户端

抽象基类 base.ts:提供限流(throttle)和配置注入,子类只需实现 fetchKlines()

// data/exchanges/base.ts
export abstract class BaseRestClient {
    abstract readonly exchange: string;
    protected readonly config: RestClientConfig;
    protected async throttle(key: string): Promise<void> { /* 冷却节流 */ }
    abstract fetchKlines(params: FetchKlinesParams): Promise<Kline[]>;
}

Binance 实现 binance/rest.ts:基于 Binance 官方 SDKMainClient + USDMClient),convertBinanceKline() 将 SDK 12 元组转为标准化 Kline。

// data/exchanges/binance/rest.ts
import { MainClient, USDMClient } from "binance";

export function convertBinanceKline(
    raw: BinanceRestKline, symbol: string, interval: KlineInterval, type: PairType,
): Kline {
    return {
        exchange: "binance",   // 现货和合约统一为 "binance",通过 type 列区分
        symbol, type, interval,
        openTime, closeTime,
        open: String(open), high: String(high), // ... OHLCV 均转 string 保持精度
        isClosed: true,
    };
}

export class BinanceRestClient extends BaseRestClient {
    readonly exchange = "binance";
    private client = new MainClient({ api_key, api_secret }, { timeout: 3000 });
    async fetchKlines(params) { /* getKlines → convertBinanceKline */ }
}

export class BinanceFuturesRestClient extends BaseRestClient {
    readonly exchange = "binance_futures"; // 仅标识用途,入库 exchange 仍为 "binance"
    private client = new USDMClient({ api_key, api_secret }, { timeout: 3000 });
    async fetchKlines(params) { /* getKlines → convertBinanceKline(同构 12 元组) */ }
}

REST 客户端注册与自动路由 rest-registry.ts

// 注册表:交易所 ID → 客户端构造器
const registry = {
    binance: () => new BinanceRestClient(),
    binance_futures: () => new BinanceFuturesRestClient(),
};

// fetchKlines() 根据 type 自动路由,调用方零感知
export async function fetchKlines(params: FetchKlinesParams): Promise<Kline[]> {
    const exchangeId = params.type === "spot"
        ? params.exchange                     // "binance"
        : `${params.exchange}_futures`;       // "binance_futures"
    const client = createRestClient(exchangeId);
    return client.fetchKlines(params);
}

设计要点:路由逻辑内聚在 fetchKlines() 一处。调用方(backfill.ts、未来其他模块)只需传 type 字段,无需感知 _futures 后缀规则。新增 Coin-M'cm')时只需在 fetchKlines() 中补充一条路由,不影响任何调用方。

WebSocket 客户端

抽象基类 base.ts

export abstract class BaseWsClient {
    abstract watch(symbol: string): void;
    abstract unwatch(symbol: string): void;
}

Binance 实现 binance/ws.ts:基于 Binance 官方 WebSocket SDK,收到闭合 K 线后标准化并通过 bus.emit("kline:update") 广播。

WS 实例池管理 ws-manager.ts:按 exchange:type 组合复用 WS 连接(同一 binance:spot 下多个 symbol 共享一个连接),watchKline() 自动去重订阅。

// data/exchanges/ws-manager.ts
const wsInstances = new Map<string, BinanceWsClient>();

function getWsInstance(exchange: string, type: PairType): BinanceWsClient {
    const key = `${exchange}:${type}`;
    if (!wsInstances.has(key)) {
        wsInstances.set(key, new BinanceWsClient(exchange, type));
    }
    return wsInstances.get(key)!;
}

export function watchKline({ exchange, type, symbol }) {
    getWsInstance(exchange, type).watch(symbol);
}

适配器实现矩阵(当前仅 Binance,多交易所为远期):

交易所 REST WS 备注
Binance 现货 BinanceRestClient BinanceWsClient 官方 SDK
Binance USDT-M BinanceFuturesRestClient BinanceWsClient 官方 SDKexchange 统一为 "binance"
OKX — 远期 — — 远期 —
Bybit — 远期 — — 远期 —

4.6 数据补全服务

数据补全由 service/backfill.ts 实现,run/exchange.ts 为独立启动入口(委托调用 backfillKlines()),run/start.ts 为完整实时流程入口(回补 → WS 订阅 → 聚合刷新)。

核心机制 — 基于 last_backfill_time 的增量回补

trading_pairs 表:

┌────┬──────────┬──────┬──────────────────────┬──────────────────────┐
│ id │  symbol  │ type │  last_backfill_time  │  last_ws_time        │
├────┼──────────┼──────┼──────────────────────┼──────────────────────┤
│  1 │ BTCUSDT  │ spot │ 2026-01-01 12:00:00  │ 2026-01-01 12:01:00  │
│  2 │ BTCUSDT  │ um   │ 2026-01-01 12:00:00  │ 2026-01-01 12:01:00  │
│  3 │ ETHUSDT  │ spot │ 2026-01-01 08:00:00  │ 2026-01-01 08:01:00  │
└────┴──────────┴──────┴──────────────────────┴──────────────────────┘
  • last_backfill_time 初始值为 1970-01-01T00:00:00Z(epoch 起点),新交易对自动触发全量回补
  • 每批拉取后更新为最后一条 K 线的 openTime
  • 重启时从断点续拉,不重复

回补流程(backfill.ts

backfillKlines()
  → getAllPairs() 获取所有交易对
  → 逐对调用 backfillKline(pair):
       while (lastBackfillTime < getPrevMinuteMS()):
         fetchKlines({ exchange, type, symbol, startTime, limit: 1000 })
         → upsertOrUpdateKlines(klines)      // 5 列冲突键 UPSERT
         → updatePairLastBackfillTime()       // 更新断点
         → 批次间随机延迟 0~1s(防限频)
  → 逐个 emit("pair:ready") / emit("pair:failed")
  → emit("backfill:complete")
  • 容错策略:任何异常标记为 error,中断所有回补,由外部进程管理器重启续拉
  • 单次请求 10s 超时,防止 SDK 僵死

Bus 事件驱动的启动时序(start.ts

1. 注册 bus 事件监听
   ├── pair:ready → watchKline(订阅 WS Kline 流)
   ├── kline:update → upsertOrUpdateKlines + updatePairWsTime → emit kline:saved
   ├── kline:saved → checkAndRefresh(连续聚合刷新)
   ├── backfill:complete → 统计输出 / 失败退出
   └── ws:disconnected/connected/stale → 断线重连检测(5 分钟观察窗口)
2. await backfillKlines() — 逐个 pair 回补,自动 emit pair:ready
3. 所有 pair 就绪后,WS 实时行情持续运行
4. SIGTERM/SIGINT → AppDataSource.destroy() → process.exit
# 独立回补(不启动 WS
bun run data/run/exchange.ts

# 完整实时流程(回补 → WS 订阅 → 聚合刷新)
bun run data/run/start.ts

5. 数据流生命周期

                    ┌──────────────────┐
                    │  Binance REST    │── backfill ────→┌──────────────────┐
                    │  (官方 SDK)      │                 │  TimescaleDB     │
                    └──────────────────┘                 │  Klines (1m)     │
                                                         └────────┬─────────┘
                    ┌──────────────────┐                          │
                    │  Binance WS      │── kline:update ──────────┤
                    │  (官方 SDK)      │         │                │
                    └────────┬─────────┘         ▼                │
                             │           upsertOrUpdateKlines    │
                             │                  │                │
                             │                  ▼                ▼
                             │           连续聚合刷新 ←── kline:saved
                             │           (klines_5m/15m/1h/...)
                             │
                             │  pair:ready ← backfill 完成后触发
                             │  ws:disconnected / ws:connected / ws:stale
                             │
                    ┌────────┴──────────────────────────┐
                    │  run/start.ts(编排层)             │
                    │  注册 bus 监听 → backfillKlines()  │
                    │  → WS 订阅 → 入库 → 聚合刷新        │
                    └───────────────────────────────────┘

                    ┌───────────────────────────────────┐
                    │  TypeORM 实体(关系数据)            │
                    │  Exchange / TradingPair           │
                    └───────────────────────────────────┘

启动时序

入口文件:run/start.ts(编排层,不直接操作 WS 实例、不写库、不刷新聚合)

1. 模块 import 阶段(隐式初始化)
   ├── config/index.ts 读取 env.yaml → 零依赖校验
   ├── utils/logger.ts 初始化 Pino
   └── db/data-source.ts TypeORM DataSource.initialize()
       └── 连接 PostgreSQLDDL 由 init-db/ SQL 脚本手动管理)
2. 注册 bus 事件监听(桥接各 service 模块)
   ├── pair:ready → watchKline(订阅 WS Kline 流)
   ├── kline:update → upsertOrUpdateKlines + updatePairWsTime → emit kline:saved
   ├── kline:saved → checkAndRefresh(连续聚合刷新) → emit aggregate:refreshed
   ├── backfill:complete → 统计输出 / 失败退出
   └── ws:disconnected / ws:connected / ws:stale → 断线重连检测(5 分钟观察窗口)
3. 调起 backfillKlines()(逐个 pair 回补,自动 emit pair:ready / backfill:complete
4. 所有交易对就绪后,WS 实时行情持续运行
5. 注册 SIGTERM/SIGINT → AppDataSource.destroy() → process.exit

6. TypeORM + TimescaleDB 集成细节

6.1 写入路径选择

场景 方案 原因
关系数据 CRUD(交易对配置、交易所) TypeORM Repository API 标准 ORM 操作,事务支持,类型安全
K 线批量写入(高频) TypeORM Repository upsert() + 批量 TypeORM 的 upsert 方法支持 ON CONFLICT DO UPDATE
K 线海量写入(极端场景) pg Pool + INSERT ... ON CONFLICT 绕过 ORM 开销,直接参数化 SQL,批量 500 条/次
// 方案 A: TypeORM Repository upsert(推荐日常使用)
const klineRepo = AppDataSource.getRepository(Kline);
await klineRepo.upsert(batchRecords, {
  conflictPaths: ["exchange", "symbol", "type", "interval", "time"],
  skipUpdateIfNoValuesChanged: true,
});

// 方案 B: 裸 pg 批量写入(极端性能场景)
import { Pool } from "pg";
const pool = new Pool(pgsql);
await pool.query(`
  INSERT INTO klines (...) VALUES ${values}
  ON CONFLICT (exchange, symbol, type, interval, time) DO UPDATE SET
    high = GREATEST(klines.high, EXCLUDED.high),
    low = LEAST(klines.low, EXCLUDED.low),
    close = EXCLUDED.close,
    volume = klines.volume + EXCLUDED.volume,
    updated_at = NOW()
`, params);

6.3 TimescaleDB 特定配置

synchronize: falseDDL 全部由 db/init-db/ SQL 脚本手动管理,不依赖 TypeORM 自动迁移:

-- db/init-db/01-timescaledb.sql
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- db/init-db/02-init-tables.sql
-- CREATE TABLE klines (...) + SELECT create_hypertable(...) + 压缩策略

-- db/init-db/03-continuous-aggregates.sql
-- 12 层连续聚合视图(3m → 5m → 15m → ... → 1mon

@timescaledb/typeorm 仅用于实体装饰器(@Hypertable / @TimeColumn),让 TypeORM 知道如何映射运行时查询,不参与 DDL 生成。

SQL 初始化脚本执行方式Docker Compose 挂载 ./data/db/init-db/ 到容器初始化目录,首次启动自动执行。


7. 配置管理策略

7.1 配置文件

项目根目录 env.yaml 为统一环境配置源,TypeScript(data/config/)和 Python 模块共享同一份 YAML。

<project_root>/
├── env.yaml          ← 统一配置源(YAMLTS/Python 共享)
└── data/
    └── config/
        ├── index.ts      ← 读取 env.yaml → 校验 → 导出分组配置
        └── validators.ts ← 零依赖运行时校验(assertString / assertPort / assertEnum

7.2 env.yaml 配置段

# --- TimescaleDB / PostgreSQL ---
db:
  host: localhost
  port: 5432
  name: trade
  user: trader
  password: fucketh

# --- Redis ---
redis:
  url: redis://localhost:6379
  publish_enabled: true

# --- 日志 ---
logging:
  level: debug          # trace / debug / info / warn / error / fatal
  node_env: development # development / production / test

7.3 使用方式

// 任何模块中导入配置(所有导出对象均为强类型 as const)
import { pgsql, redis, logging } from "../config";

// TypeORM DataSource
const ds = new DataSource({ type: "postgres", ...pgsql });

// Redis 客户端
const redisClient = new Redis(redis.url);

// 环境判断
if (logging.nodeEnv === "development") {
  logger.info("Config loaded");
}

8. 日志与可观测性

8.1 Pino 日志配置

// data/logger.ts
import pino from "pino";
import { logging } from "../config";

export const logger = pino({
  level: logging.level,
  // 开发环境:使用 pino-pretty 彩色输出
  // 生产环境:JSON 格式,便于 ELK / Loki 采集
  ...(logging.pretty
    ? { transport: { target: "pino-pretty", options: { colorize: true } } }
    : {}),
  // 自动注入模块名
  base: { module: "trade-data" },
  // 序列化 Error 对象
  serializers: {
    err: pino.stdSerializers.err,
  },
});

8.2 日志规范

级别 使用场景 示例
trace 每条 Trade 的详细处理过程 开发调试用,生产关闭
debug 批量写入、WS 心跳 "Kline batch flushed: count=500"
info 模块启动/停止、连接建立 "Binance WS connected: symbols=15"
warn 重连、数据异常、限频触发 "WS reconnecting: attempt=3, delay=12000ms"
error 写入失败、WS 不可恢复 "TimescaleDB write failed: connection refused"
fatal 进程即将退出 "Unrecoverable error, shutting down"

安全原则:严禁在日志中记录 API Key、Secret、数据库密码等敏感信息。使用 logger.info({ apiKey: "***" }) 脱敏。


9. 错误处理与容错

9.1 错误分类与策略

错误类型 处理策略 恢复方式
WebSocket 断线 指数退避重连(基数 3s,上限 10 次) 自动重连 → 重新订阅 → 补齐缺失数据
数据库写入失败 缓冲重试 3 次 → 丢弃并告警 下一批数据正常写入
数据库连接断开 pg Pool 自动重连 TypeORM DataSource 重建
Redis 发布失败 静默丢弃(发布是非关键路径) 下一 tick 继续发布
配置校验失败 进程退出(fail-fast 修正 .env 后重启
交易所 API 限频 令牌桶限流,429 响应冷却 60s 自动恢复

9.2 优雅关闭(Graceful Shutdown

// data/run/start.ts — 实际优雅关闭实现
process.on("SIGTERM", () => {
    logger.info("收到 SIGTERM,关闭连接");
    AppDataSource.destroy().finally(() => process.exit(0));
});

process.on("SIGINT", () => {
    logger.info("收到 SIGINT,关闭连接");
    AppDataSource.destroy().finally(() => process.exit(0));
});

当前为最小实现:仅关闭 TypeORM DataSource。WebSocket、Redis 等远期组件将来加入关闭链。


10. 性能考量

10.1 关键性能指标

指标 目标 说明
单笔 Trade 处理延迟 < 1 μs 时间桶 HashMap 查找 + 更新
WebSocket 消息吞吐 > 10,000 msg/s 单进程处理全部订阅对
批量写入延迟 < 100 ms 500 条批量 UPSERT
内存占用(10 symbol × 5 周期) < 50 MB 时间桶缓存
进程启动时间 < 3 s Bun 启动 + TypeORM 初始化

10.2 优化策略

策略 实现 收益
批量写入 500 条 / 1s 批量刷新 减少 99% DB 连接开销
TimescaleDB 压缩 7 天后自动列式压缩 存储减少 90%+
连续聚合 高周期从低周期视图读取 查询快 100×(1h K 线无需扫描 1m 表)
连接池复用 TypeORM DataSource 管理 20 连接 防止连接数暴涨
JSON 序列化 生产环境考虑 MessagePack 体积减少 30-50%,序列化快 2×

10.3 磁盘空间估算

数据量(1m K 线) 未压缩 压缩后(92%
10 币种 × 1 年 ~945 MB ~75 MB
50 币种 × 1 年 ~4.7 GB ~375 MB
200 币种 × 1 年 ~18.9 GB ~1.5 GB

10.4 负载模型

本模块非 HTTP 服务,单进程单租户运行。以下模型用于评估优化收益和容量规划,避免以"通用库"视角高估问题优先级。

运行模式

阶段 触发 网络特征 持续
回补 cron / 手动 REST API 分页拉取,受 --concurrency 和 200ms 限频约束 一次性,时长按数据缺口
实时采集 常驻进程 WebSocket 订阅,被动接收 7×24

日志量

日志量由 K 线间隔锁定,不受交易活跃度影响:

日志条数/秒 ≈ 交易对数 / 60
交易对数 日志条/秒 resolveCaller CPU 占比(单核)
10 0.17 < 0.001%
100 1.67 < 0.01%
500 8.33 < 0.03%

resolveCallernew Error().stack 开销(~0.3ms/条)在万级日志/秒以下无实际影响,不值得为此牺牲生产排障的 caller 字段。该问题已标记为不修复,详见 docs/tech-debt.md #6。

数据库写入

数据类型 频率 写入方式
1m K 线 每交易对每分钟 1 条 upsertOrUpdateKlines 单条写入
逐笔成交 (Trade) 高峰 >100 tps/交易对 批量写入(待实现)

100 交易对场景下 K 线写入量 ≈ 100 条/分钟,不存在写入瓶颈。

内存

结构 单条估算 100 交易对
K 线时间桶 (1m) ~200 bytes/桶 100 × 200B = 20 KB
5m/15m/1h/4h/1d 聚合桶 ~200 bytes/桶/周期 100 × 5 × 200B = 100 KB

总内存占用 < 10 MB(不含 Bun 运行时和 TypeORM 连接池),远低于 §10.1 指标目标(< 50 MB)。


11. 开发工作流

11.1 本地开发

# 1. 启动基础服务
docker compose up -d timescaledb

# 2. 安装依赖
cd data && bun install

# 3. 配置环境
# 编辑项目根目录 env.yaml(如不存在则创建)

# 4. 启动数据模块
bun run run/start.ts

# 5. 运行测试
bun test

11.2 新增交易所适配器

# 1. 创建适配器文件
touch exchanges/new-exchange.ts

# 2. 实现 MarketDataFeed 接口
# 3. 在 types/base.ts 注册类型,在 exchanges/rest-registry.ts 注册适配器(index.ts 为纯 re-export
# 4. 编写测试 tests/exchanges/new-exchange.test.ts
# 5. 如需在数据库中配置,插入 exchanges 表和 trading_pairs 表

11.3 数据库迁移

# 生成迁移文件(开发环境)
bunx typeorm migration:generate db/migrations/AddNewColumn -d db/data-source.ts

# 执行迁移(生产环境)
bunx typeorm migration:run -d db/data-source.ts

# 回滚最近一次迁移
bunx typeorm migration:revert -d db/data-source.ts

11.4 目录与文件命名规范

规范 说明
文件名 kebab-case.ts
类名 PascalCase
函数/变量 camelCase
常量 UPPER_SNAKE_CASE
类型/接口 PascalCase
测试文件 *.test.ts(与源文件同目录或 tests/ 下镜像结构)

11.5 数据补全

# 独立回补(不启动 WS,逐对拉取→入库→更新时间戳)
bun run data/run/exchange.ts

# 完整实时流程(回补 → WS 订阅 → 聚合刷新)
bun run data/run/start.ts

# 聚合视图刷新
bun run data/run/build_aggregates_sql.ts            # dry-run
bun run data/run/build_aggregates_sql.ts --execute  # 执行刷新

12. 风险提示

⚠️ 重要声明:本数据模块为量化交易系统的技术基础设施,仅提供数据采集、存储与发布功能。任何基于本系统构建的交易策略,其盈亏风险由使用者自行承担。本系统作者不提供任何形式的投资建议,不对策略收益做任何承诺。

技术风险

风险 影响 缓解措施
交易所 API 变更 数据断流 适配器模式隔离,单个交易所变更不影响其他
WebSocket 数据丢失 K 线不完整 REST 补拉机制 + gap 标记
数据库写入瓶颈 数据积压 批量写入 + 连接池
时钟偏差 K 线时间戳错位 NTP 时钟同步 + UTC 统一对齐

实盘接入前置条件

当前处于开发阶段,以下条件为远期目标。

  1. 模拟盘环境连续运行 ≥ 2 周无致命错误
  2. 核心模块单元测试覆盖率 ≥ 80%
  3. 数据库写入延迟 p99 < 200ms
  4. WebSocket 断线自动恢复成功率 ≥ 99%
  5. 日志采集与告警链路就绪

附录 Adocker-compose.yml 数据库配置参考

# docker-compose.yml(项目根目录)
services:
  timescaledb:
    image: timescale/timescaledb-ha:pg17.10-ts2.27.1
    container_name: trade-timescaledb
    restart: unless-stopped
    ports:
      - "5432:5432"
    environment:
      POSTGRES_DB: trade
      POSTGRES_USER: trader
      POSTGRES_PASSWORD: ${DB_PASSWORD:-changeme}
    volumes:
      - ./db/pgsql:/var/lib/postgresql          # 数据持久化
      - ./data/init-db:/docker-entrypoint-initdb.d # 自动执行建表 SQLfallback
    command: >
      -c shared_buffers=1GB
      -c effective_cache_size=3GB
      -c maintenance_work_mem=256MB
      -c work_mem=64MB
      -c wal_buffers=64MB
      -c random_page_cost=1.1
      -c effective_io_concurrency=200
      -c max_connections=50
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U trader -d trade"]
      interval: 10s
      timeout: 5s
      retries: 5

附录 B:核心依赖版本锁定

版本 说明
typescript ^6.0 语言编译器
bun ≥1.3 运行时
typeorm ^1.0 关系数据 ORM
@timescaledb/typeorm ^0.0.1 TimescaleDB TypeORM 扩展(实验性,仅装饰器映射)
pg ^8.21 PostgreSQL 原生驱动
binance ^3.5 Binance 官方 SDKMainClient + USDMClient + WebSocket
yaml ^2.9 YAML 解析(env.yaml
pino ^10.3 结构化日志
ioredis ^5.11 Redis 客户端(远期)
ccxt ^4.5 多交易所统一 API(远期)
ws ^8.21 通用 WebSocket 客户端(远期)