Files
trade/data/ARCHITECTURE.md
T
Rekey 85a0031a78 feat(data): 实现 Binance WebSocket 适配器与架构重构
- 新增 exchanges/ 模块:MarketDataFeed 统一接口、BaseExchangeAdapter 抽象基类、
  BinanceAdapter 完整实现(WebSocket + REST)
- WebSocket 层基于 binance 官方 SDK 的 WebsocketClient,自动多路复用与断线重连
- REST 层使用 MainClient(Spot),实现 fetchKlines 自动分页补拉 + fetchMarkets 元数据解析
- 数据标准化:Ticker/Trade/OrderBook/Kline 类型定义与 Binance 原生格式互转
- 引入 RxJS Subject 作为统一事件流管道,按 eventType 运行时路由分发
- 重构 config/:YAML 驱动配置加载 + 零依赖运行时校验(fail-fast)
- 重构 db/:TypeORM DataSource 配置 + TimescaleDB K 线 Hypertable 实体
- 新增 utils/logger.ts:Pino 结构化日志(开发环境 pino-pretty 彩色输出)
- 新增 env.yaml 作为 TS/Python 共享的统一环境配置源
- 删除旧版手写 SQL schema 与散落配置文件,收敛到 TypeORM 实体管理
- 安装 rxjs@7.8.2 依赖
2026-06-08 01:24:48 +08:00

50 KiB
Raw Blame History

Trade Data Module — 技术架构说明

模块定位:数字货币量化交易系统的数据层,负责多交易所行情采集、K 线合成、时序数据持久化与实时发布。

运行时Bun | 语言TypeScript 5.x | 数据库PostgreSQL + TimescaleDB | ORMTypeORM + @timescaledb/typeorm


目录

  1. 技术选型与依赖矩阵
  2. 整体架构
  3. 目录结构
  4. 核心模块设计
  5. 数据流生命周期
  6. TypeORM + TimescaleDB 集成细节
  7. 配置管理策略
  8. 日志与可观测性
  9. 错误处理与容错
  10. 性能考量
  11. 开发工作流
  12. 风险提示

1. 技术选型与依赖矩阵

1.1 为什么选择 Bun + TypeScript

决策点 选型 理由
运行时 Bun 原生支持 TypeScript,零配置运行 .ts 文件;内置 bun:sql 可在无 ORM 场景直连 PG;启动速度比 Node.js 快 4×;兼容 Node.js ecosystem
语言 TypeScript 5.x 编译期类型检查,在数据管道中消除 undefined / 类型不匹配导致的运行时崩溃
包管理 bun install 与 Bun 运行时一体,安装速度比 npm 快 25×
测试 Vitest 原生 ESM 支持,与 Bun 兼容良好,内置断言 + 覆盖率

1.2 依赖矩阵

类别 依赖包 版本 用途
ORM 框架 typeorm ^1.0 关系数据(交易对配置、交易所元数据)的 ORM 映射
时序 ORM @timescaledb/typeorm ^0.0.1 TimescaleDB hypertable 实体定义、自动迁移、连续聚合管理
PG 驱动 pg ^8.21 TypeORM 底层 PG 连接;独立批量写入场景的备选路径
Redis ioredis ^5.11 Pub/Sub 行情发布、缓存、跨语言消息队列
交易所 SDK ccxt ^4.5 统一 100+ 交易所 REST API
交易所 WS binance ^3.5 Binance 官方 WebSocket 客户端(ccxt 的 WS 能力较弱)
WebSocket ws ^8.21 通用 WebSocket 客户端(非 Binance 交易所)
日志 pino ^10.3 高性能结构化日志(Bun 下吞吐 > 100k/s
YAML 解析 yaml ^2.9 解析项目根目录 env.yaml 配置文件

1.3 TypeORM + TimescaleDB 分工边界

┌─────────────────────────────────────────────────────────────┐
│                    PostgreSQL 实例                            │
│                                                              │
│  ┌─────────────────────────────────────┐                    │
│  │        TypeORM 管理域(关系数据)      │                    │
│  │  · exchanges(交易所配置)            │                    │
│  │  · trading_pairs(交易对配置)         │                    │
│  │  · symbols(币种元数据)              │                    │
│  │  · data_sources(数据源注册)          │                    │
│  │  → Migration: TypeORM 自动/手动迁移    │                    │
│  └─────────────────────────────────────┘                    │
│                                                              │
│  ┌─────────────────────────────────────┐                    │
│  │  @timescaledb/typeorm 管理域(时序)  │                    │
│  │  · klines_1m1 分钟 K 线 hypertable)│                    │
│  │  · klines_1h(1 小时连续聚合视图)     │                    │
│  │  · trades(逐笔成交 hypertable      │                    │
│  │  → Migration: @timescaledb/typeorm    │                    │
│  │    自动创建 hypertable + 压缩策略     │                    │
│  └─────────────────────────────────────┘                    │
└─────────────────────────────────────────────────────────────┘

设计原则:TypeORM 管理结构稳定的关系数据(schema 变更频率低);@timescaledb/typeorm 管理写入密集型时序数据(利用 TimescaleDB 的自动分区、压缩、保留策略)。


2. 整体架构

┌──────────────────────────────────────────────────────────────────────┐
│                        Trade Data Module (Bun + TS)                   │
│                                                                       │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐                           │
│  │ Binance  │  │   OKX    │  │  Bybit   │   ← 交易所 WebSocket 适配器 │
│  │ Adapter  │  │ Adapter  │  │ Adapter  │                           │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘                           │
│       │              │              │                                  │
│       └──────────────┼──────────────┘                                  │
│                      ▼                                                 │
│  ┌────────────────────────────────────────┐                          │
│  │         统一行情事件流(RxJS)            │                          │
│  │   ticker$ │ trade$ │ orderbook$         │                          │
│  └──────────────────┬─────────────────────┘                          │
│                     ▼                                                 │
│  ┌────────────────────────────────────────┐                          │
│  │         K 线合成管道(时间桶算法)         │                          │
│  │   Trade → 1m → 5m → 15m → 1h → 4h → 1d │                          │
│  └────────┬───────────────────┬───────────┘                          │
│           ▼                   ▼                                       │
│  ┌────────────────┐  ┌──────────────────┐                            │
│  │ TimescaleDB    │  │  Redis Pub/Sub   │                            │
│  │ @timescaledb/  │  │  (ioredis)       │                            │
│  │ typeorm 写入   │  │  行情实时发布      │                            │
│  └───────┬────────┘  └────────┬─────────┘                            │
│          │                    │                                        │
│  ┌───────▼────────┐  ┌───────▼─────────┐                             │
│  │ 连续聚合视图    │  │  Python 策略引擎 │                             │
│  │ (自动刷新)     │  │  (消费实时行情)  │                             │
│  └────────────────┘  └─────────────────┘                             │
│                                                                       │
│  ┌────────────────────────────────────────┐                          │
│  │  TypeORM 实体(关系数据)                 │                          │
│  │  Exchange / TradingPair / Symbol /      │                          │
│  │  DataSource / Subscription             │                          │
│  └────────────────────────────────────────┘                          │
└──────────────────────────────────────────────────────────────────────┘

3. 目录结构

data/
├── ARCHITECTURE.md                # ← 本文件:技术架构说明
├── package.json                   # 依赖与脚本
├── tsconfig.json                  # TypeScript 配置
├── bun.lock                       # Bun 依赖锁定文件
│
├── config/                        # 中心化配置模块(目录)
│   ├── index.ts                   # 配置加载与分组导出(pgsql / redis / logging
│   └── validators.ts              # 零依赖运行时校验(env.yaml → EnvConfig
│
├── db/                            # 数据库层
│   ├── data-source.ts             # TypeORM DataSource 配置
│   │
│   ├── entities/                  # 所有 ORM 实体(关系 + 时序)
│   │   ├── index.ts               # 实体统一导出
│   │   ├── common.entity.ts       # 公共基类(UUID 主键 + created_at/updated_at
│   │   ├── exchange.entity.ts     # 交易所(binance/okx/bybit
│   │   ├── trading-pair.entity.ts # 交易对(BTCUSDT/ETHUSDT
│   │   └── kline.entity.ts        # TimescaleDB K 线 hypertable1m
│   │
│   └── migrations/                # TypeORM 迁移文件(自动生成)
│       └── .gitkeep
├── exchanges/                     # 交易所适配器(待实现)
├── pipeline/                      # K 线合成管道(待实现)
├── publisher/                     # Redis 数据发布(待实现)
├── types/                         # 共享类型定义
├── utils/                         # 工具函数
├── index.ts                       # 模块入口
├── logger.ts                      # Pino 日志实例
└── tests/                         # 测试

4. 核心模块设计

4.1 配置模块 config/

加载流程:
  项目根目录 env.yaml → parseYaml() 解析 → validateConfig() 零依赖校验
  → 按职责分组导出: pgsql / redis / logging
导出对象 字段 来源 说明
pgsql host, port, database, user, password, max, idleTimeoutMillis, connectionTimeoutMillis env.yaml → 校验 + 硬编码 pg Pool 配置,TypeORM DataSource 复用;max/idleTimeoutMillis/connectionTimeoutMillis 硬编码
redis url, publishEnabled, channelPrefix, retryDelayBaseMs, maxRetries env.yaml → 校验 + 硬编码 ioredis 连接与 Pub/Sub 配置;channelPrefix/retryDelayBaseMs/maxRetries 硬编码
logging level, nodeEnv, pretty env.yaml → 校验 Pino 日志配置;prettynode_env === "development" 推导

设计原则:仅数据库连接、Redis 连接、日志级别等运维配置通过 env.yaml 暴露。配置源为项目根目录的 env.yamlTypeScript/Python 模块共享同一份配置。

校验定义位置config/validators.ts — 零依赖运行时校验(assertString / assertPort / assertBoolean / assertEnum),fail-fast 原则。

配置分组详情

// —— pgsql: 数据库连接 ——
export const pgsql = {
    host: rawConfig.db.host,            // env.yaml default: "localhost"
    port: rawConfig.db.port,            // env.yaml default: 5432
    database: rawConfig.db.name,        // env.yaml default: "trade"
    user: rawConfig.db.user,            // env.yaml default: "trader"
    password: rawConfig.db.password,    // env.yaml required
    max: 20,                            // 硬编码:连接池上限
    idleTimeoutMillis: 30000,           // 硬编码:空闲超时 30s
    connectionTimeoutMillis: 5000,      // 硬编码:连接获取超时 5s
} as const;

// —— redis: 消息队列 ——
export const redis = {
    url: rawConfig.redis.url,                           // env.yaml default: "redis://localhost:6379"
    publishEnabled: rawConfig.redis.publish_enabled,    // env.yaml default: true
    channelPrefix: "trade",                             // 硬编码
    retryDelayBaseMs: 1000,                             // 硬编码
    maxRetries: 10,                                     // 硬编码
} as const;

// —— logging: 日志 ——
export const logging = {
    level: rawConfig.logging.level,         // env.yaml default: "debug"
    nodeEnv: rawConfig.logging.node_env,    // env.yaml default: "development"
    pretty: rawConfig.logging.node_env === "development",
} as const;

4.2 TypeORM 数据源 db/data-source.ts

// data/db/data-source.ts
import { DataSource } from "typeorm";
import { pgsql } from "../config";
import * as entities from "./entities";

export const AppDataSource = new DataSource({
    type: "postgres",
    host: pgsql.host, port: pgsql.port,
    database: pgsql.database,
    username: pgsql.user, password: pgsql.password,
    // 所有实体(关系 + 时序)通过 db/entities/index.ts 统一注册
    entities: [...Object.values(entities)],
    synchronize: true, // 开发环境自动同步 schema
    migrations: [__dirname + "/migrations/*.{ts,js}"],
    extra: {
        max: pgsql.max,
        idleTimeoutMillis: pgsql.idleTimeoutMillis,
        connectionTimeoutMillis: pgsql.connectionTimeoutMillis,
    },
});
await AppDataSource.initialize();

实体注册原则

  • 关系实体Exchange, TradingPair):继承 CommonBaseEntityUUID 主键 + created_at/updated_at),标准 @Entity() 装饰器
  • 时序实体Kline):不继承 CommonBaseEntity,使用 @Hypertable() + @TimeColumn() 自动创建 TimescaleDB hypertable
  • 统一入口:所有实体通过 db/entities/index.ts 导出,...Object.values(entities) 自动注册

4.3 TimescaleDB K 线实体 db/entities/klines/

kline.entity.ts — 1m K 线 Hypertable

// data/db/entities/klines/kline.entity.ts
import {
  Hypertable,
  TimeColumn,
  PartitionColumn,
  Compress,
  ContinuousAggregate,
} from "@timescaledb/typeorm";
import {
  Entity,
  PrimaryColumn,
  Column,
  Unique,
  Index,
  CreateDateColumn,
  UpdateDateColumn,
} from "typeorm";

/**
 * 1 分钟 K 线 Hypertable
 *
 * TimescaleDB 自动按 time 列分区(chunk_time_interval = 1 day),
 * 按 exchange 做空间分区(number_partitions = 4),
 * 关闭 7 天后自动启用列式压缩。
 */
@Hypertable({
  chunk_time_interval: "1 day",        // 每个 chunk 含 1 天数据
  partitioning_column: "exchange",     // 空间分区列
  number_partitions: 4,                // 4 个空间分区
  if_not_exists: true,
})
@Compress({
  segmentby: "exchange, symbol, interval", // 压缩分区键
  orderby: "time DESC",                    // 压缩排序键
  compress_after: "7 days",                // 7 天后自动压缩
})
@Index(["exchange", "symbol", "interval", "time"], { unique: true })
export class Kline {
  @TimeColumn()                // @timescaledb/typeorm: 标记为时间分区列
  @PrimaryColumn("timestamptz")
  time!: Date;

  @PartitionColumn()           // @timescaledb/typeorm: 标记为空间分区列
  @Column("text")
  exchange!: string;

  @Column("text")
  symbol!: string;

  @Column("text")
  interval!: string;           // "1m"

  // === OHLCV 价格数据(NUMERIC(20,8) 精度) ===
  @Column("numeric", { precision: 20, scale: 8 })
  open!: number;

  @Column("numeric", { precision: 20, scale: 8 })
  high!: number;

  @Column("numeric", { precision: 20, scale: 8 })
  low!: number;

  @Column("numeric", { precision: 20, scale: 8 })
  close!: number;

  @Column("numeric", { precision: 20, scale: 8 })
  volume!: number;

  // === 扩展字段 ===
  @Column("numeric", { precision: 20, scale: 8, nullable: true })
  quote_volume?: number;

  @Column("numeric", { precision: 20, scale: 8, nullable: true })
  taker_buy_base_vol?: number;

  @Column("numeric", { precision: 20, scale: 8, nullable: true })
  taker_buy_quote_vol?: number;

  @Column("integer", { nullable: true })
  trade_count?: number;

  @Column("boolean", { default: true })
  is_closed!: boolean;

  // === 审计字段 ===
  @CreateDateColumn("timestamptz")
  created_at!: Date;

  @UpdateDateColumn("timestamptz")
  updated_at!: Date;
}

@timescaledb/typeorm 关键装饰器

装饰器 作用 SQL 等价
@Hypertable() 将实体映射为 TimescaleDB hypertable SELECT create_hypertable(...)
@TimeColumn() 标记时间分区列 -- time 列
@PartitionColumn() 标记空间分区列 partitioning_column =>
@Compress() 配置自动压缩策略 ALTER TABLE SET (timescaledb.compress) + add_compression_policy()
@ContinuousAggregate() 声明连续聚合视图(见下文) CREATE MATERIALIZED VIEW ... WITH (timescaledb.continuous)

trade.entity.ts — 逐笔成交 Hypertable

// data/db/entities/klines/trade.entity.ts
/**
 * 逐笔成交记录 Hypertable
 *
 * 存储交易所推送的每笔撮合成交,是 K 线合成器的数据源。
 * 数据量极大(BTCUSDT 高峰 >100tps),保留策略设为 3 天。
 */
@Hypertable({
  chunk_time_interval: "1 hour",       // 高频写入,chunk 更细
  partitioning_column: "exchange",
  number_partitions: 4,
  if_not_exists: true,
})
@Compress({
  segmentby: "exchange, symbol",
  orderby: "trade_time DESC",
  compress_after: "1 day",             // 1 天后即压缩
})
export class Trade {
  @TimeColumn()
  @PrimaryColumn("timestamptz")
  trade_time!: Date;

  @PartitionColumn()
  @Column("text")
  exchange!: string;

  @Column("text")
  symbol!: string;

  @Column("numeric", { precision: 20, scale: 8 })
  price!: number;

  @Column("numeric", { precision: 20, scale: 8 })
  amount!: number;                     // 成交数量(base 币种)

  @Column("numeric", { precision: 20, scale: 8 })
  quote_amount!: number;               // 成交额(quote 币种)

  @Column("boolean")
  is_buyer_maker!: boolean;            // true = 主动卖出, false = 主动买入

  @Column("bigint", { nullable: true })
  trade_id?: number;                   // 交易所成交 ID

  @CreateDateColumn("timestamptz")
  created_at!: Date;
}

连续聚合声明(5m / 15m / 1h / 1d

@timescaledb/typeorm 支持通过实体类声明连续聚合,替代手动编写 SQL 物化视图:

// data/db/entities/klines/kline-1h.entity.ts(示意)
import { ContinuousAggregate } from "@timescaledb/typeorm";

/**
 * 1h K 线连续聚合视图
 *
 * 从 klines (1m) 表自动聚合,刷新策略:每 5 分钟刷新最近 1 天数据。
 * 应用程序查询时自动路由到该视图,无需扫描 1m 表。
 */
@ContinuousAggregate({
  source: () => Kline,              // 源实体(1m K 线)
  bucket_interval: "1 hour",        // 聚合时间桶
  refresh: {
    start_offset: "3 days",
    end_offset: "1 hour",           // 留 1 小时给延迟数据
    schedule_interval: "5 minutes",
  },
})
export class Kline1h {
  @TimeColumn()
  @PrimaryColumn("timestamptz")
  time!: Date;

  @Column("text") exchange!: string;
  @Column("text") symbol!: string;

  // 聚合字段(由 timescaledb 自动填充)
  open!: number;
  high!: number;
  low!: number;
  close!: number;
  volume!: number;
  quote_volume!: number;
  trade_count!: number;
}

注意@timescaledb/typeorm@ContinuousAggregate 装饰器目前处于实验阶段(v0.0.1)。生产环境中建议同时保留 db/init-db/ 下的原始 SQL 建表脚本作为 fallback,通过 Docker 的 /docker-entrypoint-initdb.d/ 自动执行。


4.4 关系数据实体 db/entities/

这些实体由标准 TypeORM 管理,存储业务配置数据。

exchange.entity.ts

// data/db/entities/exchange.entity.ts
@Entity("exchanges")
export class Exchange {
  @PrimaryGeneratedColumn("uuid")
  id!: string;

  @Column("varchar", { length: 50, unique: true })
  name!: string;                    // "binance" | "okx" | "bybit"

  @Column("varchar", { length: 100 })
  label!: string;                   // "Binance" | "OKX" | "Bybit"

  @Column("boolean", { default: true })
  enabled!: boolean;                // 是否启用该交易所

  @Column("jsonb", { nullable: true })
  config?: Record<string, unknown>; // 交易所特定配置(费率、最小下单量等)
}

trading-pair.entity.ts

// data/db/entities/trading-pair.entity.ts
@Entity("trading_pairs")
@Index(["exchange_id", "symbol"], { unique: true })
export class TradingPair {
  @PrimaryGeneratedColumn("uuid")
  id!: string;

  @ManyToOne(() => Exchange)
  @JoinColumn({ name: "exchange_id" })
  exchange!: Exchange;

  @Column("varchar", { length: 20 })
  symbol!: string;                  // "BTCUSDT"

  @Column("varchar", { length: 10 })
  base_asset!: string;              // "BTC"

  @Column("varchar", { length: 10 })
  quote_asset!: string;             // "USDT"

  @Column("numeric", { precision: 20, scale: 8, nullable: true })
  min_qty?: number;                 // 最小下单量

  @Column("numeric", { precision: 20, scale: 8, nullable: true })
  step_size?: number;               // 下单步长

  @Column("boolean", { default: true })
  active!: boolean;                 // 是否订阅
}

symbol.entity.ts

// data/db/entities/symbol.entity.ts
@Entity("symbols")
export class SymbolEntity {
  @PrimaryGeneratedColumn("uuid")
  id!: string;

  @Column("varchar", { length: 10, unique: true })
  asset!: string;                   // "BTC" | "ETH"

  @Column("varchar", { length: 50, nullable: true })
  name?: string;                    // "Bitcoin" | "Ethereum"

  @Column("boolean", { default: false })
  is_stablecoin!: boolean;          // 是否为稳定币
}

实体 ER 关系

┌─────────────┐       ┌──────────────────┐       ┌─────────────┐
│   Exchange   │──1:N──│   TradingPair    │──N:1──│  SymbolEntity│
│              │       │                  │       │              │
│ id (PK)      │       │ id (PK)          │       │ id (PK)      │
│ name         │       │ exchange_id (FK) │       │ asset        │
│ label        │       │ symbol           │       │ name         │
│ enabled      │       │ base_asset       │       │ is_stablecoin│
│ config       │       │ quote_asset      │       │              │
└─────────────┘       │ min_qty          │       └─────────────┘
                      │ step_size        │
                      │ active           │
                      └──────────────────┘

4.5 交易所适配器 exchanges/

// data/exchanges/types.ts
import type { Observable } from "rxjs";

/** 统一行情数据源接口 —— 所有交易所适配器必须实现 */
export interface MarketDataFeed {
  /** 交易所标识 */
  readonly exchange: string;

  /** 建立 WebSocket 连接 */
  connect(): Promise<void>;

  /** 断开连接 */
  disconnect(): Promise<void>;

  /** 订阅实时 Ticker 流 */
  subscribeTicker(symbols: string[]): Observable<Ticker>;

  /** 订阅逐笔成交流 */
  subscribeTrade(symbols: string[]): Observable<Trade>;

  /** 订阅订单簿深度 */
  subscribeOrderbook(symbol: string, depth?: number): Observable<OrderBook>;

  /** REST 拉取历史 K 线(用于补齐缺失数据) */
  fetchKlines(
    symbol: string,
    interval: KlineInterval,
    startTime: number,
    endTime: number,
    limit?: number
  ): Promise<Kline[]>;

  /** 连接状态 */
  readonly connectionState: "disconnected" | "connecting" | "connected" | "error";
}

基类设计 base.ts

// data/exchanges/base.ts
export abstract class BaseExchangeAdapter implements MarketDataFeed {
  abstract readonly exchange: string;

  protected connectionState: "disconnected" | "connecting" | "connected" | "error" =
    "disconnected";

  // 指数退避重连(所有子类复用)
  // 重连参数在各子类构造函数中注入,默认值: baseDelay=3000ms, maxAttempts=10
  protected async reconnect(attempt: number, baseDelayMs = 3000): Promise<void> {
    const delay = baseDelayMs * Math.pow(2, Math.min(attempt, 5));
    logger.warn({ exchange: this.exchange, attempt, delay }, "WebSocket reconnecting");
    await sleep(delay);
    await this.connect();
  }

  abstract connect(): Promise<void>;
  abstract disconnect(): Promise<void>;
  abstract subscribeTicker(symbols: string[]): Observable<Ticker>;
  abstract subscribeTrade(symbols: string[]): Observable<Trade>;
  abstract subscribeOrderbook(symbol: string, depth?: number): Observable<OrderBook>;
  abstract fetchKlines(
    symbol: string, interval: KlineInterval,
    startTime: number, endTime: number, limit?: number
  ): Promise<Kline[]>;
}

适配器实现矩阵

交易所 适配器类 WebSocket 库 REST 库 备注
Binance BinanceAdapter binance (官方 SDK) ccxt 官方 WS 支持组合流(多 symbol 单连接),性能最优
OKX OKXAdapter ccxt.pro ccxt ccxt.pro 内置 WS 管理
Bybit BybitAdapter ws (裸库) ccxt Bybit V5 WebSocket 协议

4.6 K 线合成管道 pipeline/

时间桶算法(核心)

每收到一条 Trade
   1. 计算桶索引: bucketIndex = floor(timestamp / intervalMs)
   2. 若 bucketIndex ≠ 当前桶:
        a. emit 当前桶 K 线(已关闭)
        b. 创建新桶,O=H=L=C=price, V=amount
   3. 若仍在当前桶:
        a. H = max(H, price)
        b. L = min(L, price)
        c. C = price
        d. V += amount

多周期合成策略(混合方案)

Trade 流 ────► 1m 合成器(时间桶,数据源= Trade)
                    │
                    ▼  输出 1m K 线
              ┌─────────────┐
              │ 1m → 5m 聚合 │  ← 5 根 1m → 1 根 5m
              └──────┬──────┘
                     ▼
              ┌──────────────┐
              │ 5m → 15m 聚合 │ ← 3 根 5m → 1 根 15m
              └──────┬───────┘
                     ▼
              ┌──────────────┐
              │ 15m → 1h 聚合 │ ← 4 根 15m → 1 根 1h
              └──────────────┘

设计理由:1m 用 Trade 实时合成(精度最高),5m+ 从低周期聚合(计算量最小)。避免为每个周期维护独立的 Trade 缓存造成内存压力。

核心代码骨架

// data/pipeline/kline-synthesizer.ts
import { Subject, interval } from "rxjs";
import type { Trade, Kline, KlineInterval } from "../types/market";

const INTERVAL_MS: Record<KlineInterval, number> = {
  "1m": 60_000, "5m": 300_000, "15m": 900_000,
  "30m": 1_800_000, "1h": 3_600_000, "4h": 14_400_000,
  "1d": 86_400_000, "1w": 604_800_000,
};

interface Bucket {
  openTime: number;
  open: number;
  high: number;
  low: number;
  close: number;
  volume: number;
  tradeCount: number;
  isClosed: boolean;
}

export class KlineSynthesizer {
  private buckets = new Map<string, Bucket>();

  /**
   * 输入一条 Trade,产出可能的闭合 K 线。
   * 返回 null 表示当前桶未关闭,暂不产出。
   */
  synthesize(
    exchange: string,
    symbol: string,
    interval: KlineInterval,
    trade: Trade,
  ): Kline | null {
    const intervalMs = INTERVAL_MS[interval];
    const bucketTime = Math.floor(trade.timestamp / intervalMs) * intervalMs;
    const key = `${exchange}:${symbol}:${interval}:${bucketTime}`;

    const existing = this.buckets.get(key);
    if (!existing) {
      this.buckets.set(key, {
        openTime: bucketTime,
        open: trade.price,
        high: trade.price,
        low: trade.price,
        close: trade.price,
        volume: trade.amount,
        tradeCount: 1,
        isClosed: false,
      });
      return null; // 新桶第一条,不产出
    }

    existing.high = Math.max(existing.high, trade.price);
    existing.low = Math.min(existing.low, trade.price);
    existing.close = trade.price;
    existing.volume += trade.amount;
    existing.tradeCount += 1;
    return null;
  }

  /** 定时关闭过期桶,返回已闭合 K 线列表 */
  closeExpired(now: number): Kline[] {
    const closed: Kline[] = [];
    for (const [key, bucket] of this.buckets.entries()) {
      const intervalMs = INTERVAL_MS["1m"]; // 仅 1m 合成器有此方法
      if (!bucket.isClosed && now >= bucket.openTime + intervalMs + 15000) {
        bucket.isClosed = true;
        closed.push(/* 构建 Kline 对象 */);
        this.buckets.delete(key);
      }
    }
    return closed;
  }

  /** 从 1m K 线聚合为高周期 K 线 */
  static aggregate(
    klines: Kline[],
    targetInterval: KlineInterval,
  ): Kline[] {
    const intervalMs = INTERVAL_MS[targetInterval];
    const groups = new Map<number, Kline[]>();

    for (const k of klines) {
      const bucketTime = Math.floor(k.openTime / intervalMs) * intervalMs;
      const group = groups.get(bucketTime) ?? [];
      group.push(k);
      groups.set(bucketTime, group);
    }

    return Array.from(groups.entries()).map(([bucketTime, group]) => ({
      openTime: bucketTime,
      closeTime: bucketTime + intervalMs,
      open: group[0]!.open,
      high: Math.max(...group.map(k => k.high)),
      low: Math.min(...group.map(k => k.low)),
      close: group[group.length - 1]!.close,
      volume: group.reduce((sum, k) => sum + k.volume, 0),
      // ...
    }));
  }
}

数据清洗 cleaner.ts

清洗规则 触发条件 处理方式
价格异常 ` price - median
空成交量 volume === 0 && tradeCount === 0 用前一根 K 线的 Close 填充(平盘 K 线)
时间戳乱序 trade.timestamp < lastTimestamp 100ms 排序窗口内重排,超时则丢弃
断线缺失 连续 5 分钟无数据 标记 gap: true,策略端可选择跳过

4.7 数据发布 publisher/

// data/publisher/redis.ts
import Redis from "ioredis";
import { redis as redisConfig } from "../../config";

export class RedisPublisher {
  private client: Redis;
  private readonly prefix: string;

  constructor() {
    this.prefix = redisConfig.channelPrefix; // "trade"
    this.client = new Redis(redisConfig.url, {
      retryStrategy: (times) => {
        if (times > redisConfig.maxRetries) return null; // 停止重试
        return redisConfig.retryDelayBaseMs * Math.pow(2, times);
      },
    });
  }

  /** 发布实时 Ticker */
  async publishTicker(exchange: string, symbol: string, data: Ticker): Promise<void> {
    if (!redisConfig.publishEnabled) return;
    const channel = `${this.prefix}:market:ticker:${exchange}:${symbol}`;
    await this.client.publish(channel, JSON.stringify(data));
  }

  /** 发布 K 线(增量 — 只推送最新一根的 OHLCV 变化) */
  async publishKlineDelta(
    exchange: string, symbol: string, interval: string, delta: KlineDelta,
  ): Promise<void> {
    if (!redisConfig.publishEnabled) return;
    const channel = `${this.prefix}:market:kline:${exchange}:${symbol}:${interval}`;
    await this.client.publish(channel, JSON.stringify(delta));
  }
}

Redis Pub/Sub 频道命名规范

频道模式 说明 消费方
trade:market:ticker:{exchange}:{symbol} 实时 Ticker Python 策略引擎
trade:market:kline:{exchange}:{symbol}:{interval} K 线增量更新 Python 策略引擎
trade:market:trade:{exchange}:{symbol} 逐笔成交 Python 策略引擎
trade:system:heartbeat:data 数据模块心跳 监控告警
trade:system:error:data 数据模块异常 监控告警

5. 数据流生命周期

                     交易所 WebSocket
                           │
              ┌────────────┼────────────┐
              ▼            ▼            ▼
         Binance WS    OKX WS      Bybit WS
              │            │            │
              └────────────┼────────────┘
                           │
                    ┌──────▼──────┐
                    │   RxJS 流    │
                    │ ticker$     │
                    │ trade$      │
                    │ orderbook$  │
                    └──────┬──────┘
                           │
              ┌────────────┼────────────┐
              ▼            ▼            ▼
         K线合成器      数据清洗      格式转换
         (时间桶)      (异常过滤)    (标准化)
              │            │            │
              └────────────┼────────────┘
                           │
              ┌────────────┼────────────┐
              ▼                         ▼
    ┌──────────────────┐    ┌──────────────────┐
    │ @timescaledb/     │    │  Redis Pub/Sub   │
    │ typeorm 批量写入  │    │  实时行情发布      │
    │                   │    │                  │
    │ · Kline (1m)      │    │ · ticker channel │
    │ · Trade (逐笔)    │    │ · kline channel  │
    │                   │    │ · trade channel  │
    └────────┬──────────┘    └────────┬─────────┘
             │                        │
             ▼                        ▼
    ┌──────────────────┐    ┌──────────────────┐
    │ TimescaleDB       │    │ Python 策略引擎   │
    │ 连续聚合刷新      │    │ (消费 + 决策)    │
    │ · klines_5m       │    │                  │
    │ · klines_15m      │    │                  │
    │ · klines_1h       │    │                  │
    │ · klines_1d       │    │                  │
    └──────────────────┘    └──────────────────┘

启动时序

1. 加载配置(config/index.ts → 读取 env.yaml → 零依赖校验)
2. 初始化 Pino 日志
3. TypeORM DataSource.initialize()
   ├── 连接 PostgreSQL
   ├── 自动创建 TimescaleDB 扩展(如不存在)
   ├── 执行 Migration(生产环境)
   └── 注册 @timescaledb/typeorm 订阅者
4. 初始化 Redis 连接(ioredis
5. 从数据库加载 TradingPair 列表(active = true
6. 启动交易所适配器(并行)
   ├── connect() → WebSocket 连接
   └── subscribeTicker/Trade(symbols) → RxJS Observable
7. 启动管道
   ├── KlineSynthesizer 订阅 trade$
   ├── DataCleaner 订阅合成输出
   └── TimescaleDBWriter + RedisPublisher 订阅清洗输出
8. 注册 SIGTERM/SIGINT 优雅关闭

6. TypeORM + TimescaleDB 集成细节

6.1 写入路径选择

场景 方案 原因
关系数据 CRUD(交易对配置、交易所) TypeORM Repository API 标准 ORM 操作,事务支持,类型安全
K 线批量写入(高频) TypeORM Repository upsert() + 批量 TypeORM 的 upsert 方法支持 ON CONFLICT DO UPDATE
K 线海量写入(极端场景) pg Pool + INSERT ... ON CONFLICT 绕过 ORM 开销,直接参数化 SQL,批量 500 条/次
// 方案 A: TypeORM Repository upsert(推荐日常使用)
const klineRepo = AppDataSource.getRepository(Kline);
await klineRepo.upsert(batchRecords, {
  conflictPaths: ["time", "exchange", "symbol", "interval"],
  skipUpdateIfNoValuesChanged: true,
});

// 方案 B: 裸 pg 批量写入(极端性能场景)
import { Pool } from "pg";
const pool = new Pool(pgsql);
await pool.query(`
  INSERT INTO klines (...) VALUES ${values}
  ON CONFLICT (time, exchange, symbol, interval) DO UPDATE SET
    high = GREATEST(klines.high, EXCLUDED.high),
    low = LEAST(klines.low, EXCLUDED.low),
    close = EXCLUDED.close,
    volume = klines.volume + EXCLUDED.volume,
    updated_at = NOW()
`, params);

6.2 批量写入缓冲

// data/pipeline/writer.ts
export class KlineWriter {
  private buffer: Kline[] = [];
  private flushTimer: ReturnType<typeof setInterval>;

  constructor(
    private readonly repo: Repository<Kline>,
    private readonly batchSize = 500,
    private readonly flushIntervalMs = 1000,
  ) {
    // 定时刷新:即使未达阈值,也保证最大延迟 ≤ flushIntervalMs
    this.flushTimer = setInterval(() => this.flush(), this.flushIntervalMs);
  }

  async write(kline: Kline): Promise<void> {
    this.buffer.push(kline);
    if (this.buffer.length >= this.batchSize) {
      await this.flush();
    }
  }

  private async flush(): Promise<void> {
    if (this.buffer.length === 0) return;
    const batch = this.buffer.splice(0, this.batchSize);
    await this.repo.upsert(batch, {
      conflictPaths: ["time", "exchange", "symbol", "interval"],
    });
    logger.debug({ count: batch.length }, "Kline batch flushed");
  }

  async destroy(): Promise<void> {
    clearInterval(this.flushTimer);
    await this.flush(); // 最后刷新残留数据
  }
}

6.3 TimescaleDB 特定配置

通过 @timescaledb/typeorm 在 DataSource 初始化时自动设置:

// db/data-source.ts 初始化后执行
await AppDataSource.initialize();

// @timescaledb/typeorm 会自动:
// 1. CREATE EXTENSION IF NOT EXISTS timescaledb;
// 2. 为 @Hypertable() 实体调用 create_hypertable()
// 3. 为 @Compress() 实体调用 add_compression_policy()
// 4. 为 @ContinuousAggregate() 实体创建物化视图 + 刷新策略

如果 @timescaledb/typeorm 的自动迁移不稳定,推荐 fallback 方案:在 docker-compose.yml 中挂载初始化脚本到 db/init-db/,容器启动时自动执行原始 SQL。


7. 配置管理策略

7.1 配置文件

项目根目录 env.yaml 为统一环境配置源,TypeScript(data/config/)和 Python 模块共享同一份 YAML。

<project_root>/
├── env.yaml          ← 统一配置源(YAMLTS/Python 共享)
└── data/
    └── config/
        ├── index.ts      ← 读取 env.yaml → 校验 → 导出分组配置
        └── validators.ts ← 零依赖运行时校验(assertString / assertPort / assertEnum

7.2 env.yaml 配置段

# --- TimescaleDB / PostgreSQL ---
db:
  host: localhost
  port: 5432
  name: trade
  user: trader
  password: fucketh

# --- Redis ---
redis:
  url: redis://localhost:6379
  publish_enabled: true

# --- 日志 ---
logging:
  level: debug          # trace / debug / info / warn / error / fatal
  node_env: development # development / production / test

7.3 使用方式

// 任何模块中导入配置(所有导出对象均为强类型 as const)
import { pgsql, redis, logging } from "../config";

// TypeORM DataSource
const ds = new DataSource({ type: "postgres", ...pgsql });

// Redis 客户端
const redisClient = new Redis(redis.url);

// 环境判断
if (logging.nodeEnv === "development") {
  logger.info("Config loaded");
}

8. 日志与可观测性

8.1 Pino 日志配置

// data/logger.ts
import pino from "pino";
import { logging } from "../config";

export const logger = pino({
  level: logging.level,
  // 开发环境:使用 pino-pretty 彩色输出
  // 生产环境:JSON 格式,便于 ELK / Loki 采集
  ...(logging.pretty
    ? { transport: { target: "pino-pretty", options: { colorize: true } } }
    : {}),
  // 自动注入模块名
  base: { module: "trade-data" },
  // 序列化 Error 对象
  serializers: {
    err: pino.stdSerializers.err,
  },
});

8.2 日志规范

级别 使用场景 示例
trace 每条 Trade 的详细处理过程 开发调试用,生产关闭
debug 批量写入、WS 心跳 "Kline batch flushed: count=500"
info 模块启动/停止、连接建立 "Binance WS connected: symbols=15"
warn 重连、数据异常、限频触发 "WS reconnecting: attempt=3, delay=12000ms"
error 写入失败、WS 不可恢复 "TimescaleDB write failed: connection refused"
fatal 进程即将退出 "Unrecoverable error, shutting down"

安全原则:严禁在日志中记录 API Key、Secret、数据库密码等敏感信息。使用 logger.info({ apiKey: "***" }) 脱敏。


9. 错误处理与容错

9.1 错误分类与策略

错误类型 处理策略 恢复方式
WebSocket 断线 指数退避重连(基数 3s,上限 10 次) 自动重连 → 重新订阅 → 补齐缺失数据
数据库写入失败 缓冲重试 3 次 → 丢弃并告警 下一批数据正常写入
数据库连接断开 pg Pool 自动重连 TypeORM DataSource 重建
Redis 发布失败 静默丢弃(发布是非关键路径) 下一 tick 继续发布
配置校验失败 进程退出(fail-fast 修正 .env 后重启
交易所 API 限频 令牌桶限流,429 响应冷却 60s 自动恢复

9.2 优雅关闭(Graceful Shutdown

// data/index.ts
async function shutdown(signal: string): Promise<void> {
  logger.info({ signal }, "Shutting down");

  // 1. 停止 WebSocket 连接(停止接收新数据)
  await Promise.all(adapters.map(a => a.disconnect()));

  // 2. 刷新 K 线写入缓冲区
  await klineWriter.destroy();

  // 3. 关闭 TypeORM DataSource
  await AppDataSource.destroy();

  // 4. 关闭 Redis
  await redisPublisher.disconnect();

  logger.info("Shutdown complete");
  process.exit(0);
}

process.on("SIGTERM", () => shutdown("SIGTERM"));
process.on("SIGINT", () => shutdown("SIGINT"));

9.3 重试工具 utils/retry.ts

// data/utils/retry.ts
export async function withRetry<T>(
  fn: () => Promise<T>,
  options: {
    maxAttempts?: number;
    baseDelayMs?: number;
    onRetry?: (attempt: number, error: Error) => void;
  } = {},
): Promise<T> {
  const { maxAttempts = 3, baseDelayMs = 1000, onRetry } = options;

  for (let attempt = 1; attempt <= maxAttempts; attempt++) {
    try {
      return await fn();
    } catch (err) {
      if (attempt === maxAttempts) throw err;
      const delay = baseDelayMs * Math.pow(2, attempt - 1);
      onRetry?.(attempt, err as Error);
      logger.warn({ attempt, delay, err }, "Retrying operation");
      await sleep(delay);
    }
  }
  throw new Error("Unreachable");
}

10. 性能考量

10.1 关键性能指标

指标 目标 说明
单笔 Trade 处理延迟 < 1 μs 时间桶 HashMap 查找 + 更新
WebSocket 消息吞吐 > 10,000 msg/s 单进程处理全部订阅对
批量写入延迟 < 100 ms 500 条批量 UPSERT
内存占用(10 symbol × 5 周期) < 50 MB 时间桶 + RxJS 缓存
进程启动时间 < 3 s Bun 启动 + TypeORM 初始化

10.2 优化策略

策略 实现 收益
批量写入 500 条 / 1s 批量刷新 减少 99% DB 连接开销
TimescaleDB 压缩 7 天后自动列式压缩 存储减少 90%+
连续聚合 高周期从低周期视图读取 查询快 100×(1h K 线无需扫描 1m 表)
连接池复用 TypeORM DataSource 管理 20 连接 防止连接数暴涨
RxJS 操作符优化 share() 多播 + bufferTime() 批量 避免重复计算
JSON 序列化 生产环境考虑 MessagePack 体积减少 30-50%,序列化快 2×

10.3 磁盘空间估算

数据量(1m K 线) 未压缩 压缩后(92%
10 币种 × 1 年 ~945 MB ~75 MB
50 币种 × 1 年 ~4.7 GB ~375 MB
200 币种 × 1 年 ~18.9 GB ~1.5 GB

11. 开发工作流

11.1 本地开发

# 1. 启动基础服务
docker compose up -d timescaledb

# 2. 安装依赖
cd data && bun install

# 3. 配置环境
# 编辑项目根目录 env.yaml(如不存在则创建)

# 4. 验证配置加载
bun run db/data-source.ts      # 测试 DataSource 初始化

# 5. 运行测试
bun test

11.2 新增交易所适配器

# 1. 创建适配器文件
touch exchanges/new-exchange.ts

# 2. 实现 MarketDataFeed 接口
# 3. 在 exchanges/types.ts 注册
# 4. 编写测试 tests/exchanges/new-exchange.test.ts
# 5. 如需在数据库中配置,插入 exchanges 表和 trading_pairs 表

11.3 数据库迁移

# 生成迁移文件(开发环境)
bunx typeorm migration:generate db/migrations/AddNewColumn -d db/data-source.ts

# 执行迁移(生产环境)
bunx typeorm migration:run -d db/data-source.ts

# 回滚最近一次迁移
bunx typeorm migration:revert -d db/data-source.ts

11.4 目录与文件命名规范

规范 说明
文件名 kebab-case.ts
类名 PascalCase
函数/变量 camelCase
常量 UPPER_SNAKE_CASE
类型/接口 PascalCase
测试文件 *.test.ts(与源文件同目录或 tests/ 下镜像结构)

12. 风险提示

⚠️ 重要声明:本数据模块为量化交易系统的技术基础设施,仅提供数据采集、存储与发布功能。任何基于本系统构建的交易策略,其盈亏风险由使用者自行承担。本系统作者不提供任何形式的投资建议,不对策略收益做任何承诺。

技术风险

风险 影响 缓解措施
交易所 API 变更 数据断流 适配器模式隔离,单个交易所变更不影响其他
WebSocket 数据丢失 K 线不完整 REST 补拉机制 + gap 标记
数据库写入瓶颈 数据积压 批量写入 + 连接池 + 监控告警
Redis 宕机 策略引擎收不到实时数据 Python 侧增加 TimescaleDB 直读 fallback
时钟偏差 K 线时间戳错位 NTP 时钟同步 + UTC 统一对齐

实盘接入前置条件

  1. 模拟盘环境连续运行 ≥ 2 周无致命错误
  2. 核心模块单元测试覆盖率 ≥ 80%
  3. 数据库写入延迟 p99 < 200ms
  4. WebSocket 断线自动恢复成功率 ≥ 99%
  5. 日志采集与告警链路就绪
  6. 资金隔离:不同策略使用独立子账户
  7. 风控模块在 Python 侧就绪(熔断、仓位限制、最大回撤)

附录 Adocker-compose.yml 数据库配置参考

# docker-compose.yml(项目根目录)
services:
  timescaledb:
    image: timescale/timescaledb-ha:pg17.10-ts2.27.1
    container_name: trade-timescaledb
    restart: unless-stopped
    ports:
      - "5432:5432"
    environment:
      POSTGRES_DB: trade
      POSTGRES_USER: trader
      POSTGRES_PASSWORD: ${DB_PASSWORD:-changeme}
    volumes:
      - ./db/pgsql:/var/lib/postgresql          # 数据持久化
      - ./data/init-db:/docker-entrypoint-initdb.d # 自动执行建表 SQLfallback
    command: >
      -c shared_buffers=1GB
      -c effective_cache_size=3GB
      -c maintenance_work_mem=256MB
      -c work_mem=64MB
      -c wal_buffers=64MB
      -c random_page_cost=1.1
      -c effective_io_concurrency=200
      -c max_connections=50
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U trader -d trade"]
      interval: 10s
      timeout: 5s
      retries: 5

附录 B:核心依赖版本锁定

版本 说明
typescript ^6.0 语言编译器
bun ≥1.3 运行时
typeorm ^1.0 关系数据 ORM
@timescaledb/typeorm ^0.0.1 TimescaleDB TypeORM 扩展(实验性)
pg ^8.21 PostgreSQL 原生驱动
yaml ^2.9 YAML 解析(env.yaml
pino ^10.3 日志
ioredis ^5.11 Redis 客户端
ccxt ^4.5 统一交易所 REST API
binance ^3.5 Binance 官方 WebSocket SDK
ws ^8.21 通用 WebSocket 客户端