# Trade Data Module — 技术架构说明 > **模块定位**:数字货币量化交易系统的数据层,负责多交易所行情采集、K 线合成、时序数据持久化与实时发布。 > > **运行时**:[Bun](https://bun.sh) | **语言**:TypeScript 5.x | **数据库**:PostgreSQL + TimescaleDB | **ORM**:TypeORM + @timescaledb/typeorm --- ## 目录 1. [技术选型与依赖矩阵](#1-技术选型与依赖矩阵) 2. [整体架构](#2-整体架构) 3. [目录结构](#3-目录结构) 4. [核心模块设计](#4-核心模块设计) - [4.1 配置模块 (`config/`)](#41-配置模块-config) - [4.2 TypeORM 数据源 (`db/`)](#42-typeorm-数据源-db) - [4.3 TimescaleDB K 线实体 (`db/entities/klines/`)](#43-timescaledb-k-线实体-dbentitiesklines) - [4.4 关系数据实体 (`db/entities/`)](#44-关系数据实体-dbentities) - [4.5 交易所适配器 (`exchanges/`)](#45-交易所适配器-exchanges) - [4.6 K 线合成管道 (`pipeline/`)](#46-k-线合成管道-pipeline) - [4.7 数据发布 (`publisher/`)](#47-数据发布-publisher) - [4.8 数据补全服务 (`run/exchange.ts`)](#48-数据补全服务-runexchangets) 5. [数据流生命周期](#5-数据流生命周期) 6. [TypeORM + TimescaleDB 集成细节](#6-typeorm--timescaledb-集成细节) 7. [配置管理策略](#7-配置管理策略) 8. [日志与可观测性](#8-日志与可观测性) 9. [错误处理与容错](#9-错误处理与容错) 10. [性能考量](#10-性能考量) 11. [开发工作流](#11-开发工作流) 12. [风险提示](#12-风险提示) --- ## 1. 技术选型与依赖矩阵 ### 1.1 为什么选择 Bun + TypeScript? | 决策点 | 选型 | 理由 | |--------|------|------| | **运行时** | Bun | 原生支持 TypeScript,零配置运行 `.ts` 文件;内置 `bun:sql` 可在无 ORM 场景直连 PG;启动速度比 Node.js 快 4×;兼容 Node.js ecosystem | | **语言** | TypeScript 5.x | 编译期类型检查,在数据管道中消除 `undefined` / 类型不匹配导致的运行时崩溃 | | **包管理** | `bun install` | 与 Bun 运行时一体,安装速度比 npm 快 25× | | **测试** | Vitest | 原生 ESM 支持,与 Bun 兼容良好,内置断言 + 覆盖率 | ### 1.2 依赖矩阵 | 类别 | 依赖包 | 版本 | 用途 | |------|--------|------|------| | **ORM 框架** | `typeorm` | ^1.0 | 关系数据(交易对配置、交易所元数据)的 ORM 映射 | | **时序 ORM** | `@timescaledb/typeorm` | ^0.0.1 | TimescaleDB hypertable 实体定义、自动迁移、连续聚合管理 | | **PG 驱动** | `pg` | ^8.21 | TypeORM 底层 PG 连接;独立批量写入场景的备选路径 | | **Redis** | `ioredis` | ^5.11 | Pub/Sub 行情发布、缓存、跨语言消息队列 | | **交易所 SDK** | `ccxt` | ^4.5 | 统一 100+ 交易所 REST API | | **交易所 WS** | `binance` | ^3.5 | Binance 官方 WebSocket 客户端(ccxt 的 WS 能力较弱) | | **WebSocket** | `ws` | ^8.21 | 通用 WebSocket 客户端(非 Binance 交易所) | | **日志** | `pino` | ^10.3 | 高性能结构化日志(Bun 下吞吐 > 100k/s) | | **YAML 解析** | `yaml` | ^2.9 | 解析项目根目录 env.yaml 配置文件 | ### 1.3 TypeORM + TimescaleDB 分工边界 ``` ┌─────────────────────────────────────────────────────────────┐ │ PostgreSQL 实例 │ │ │ │ ┌─────────────────────────────────────┐ │ │ │ TypeORM 管理域(关系数据) │ │ │ │ · exchanges(交易所配置) │ │ │ │ · trading_pairs(交易对配置) │ │ │ │ · symbols(币种元数据) │ │ │ │ · data_sources(数据源注册) │ │ │ │ → Migration: TypeORM 自动/手动迁移 │ │ │ └─────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────┐ │ │ │ @timescaledb/typeorm 管理域(时序) │ │ │ │ · klines_1m(1 分钟 K 线 hypertable)│ │ │ │ · klines_1h(1 小时连续聚合视图) │ │ │ │ · trades(逐笔成交 hypertable) │ │ │ │ → Migration: @timescaledb/typeorm │ │ │ │ 自动创建 hypertable + 压缩策略 │ │ │ └─────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘ ``` > **设计原则**:TypeORM 管理结构稳定的关系数据(schema 变更频率低);`@timescaledb/typeorm` 管理写入密集型时序数据(利用 TimescaleDB 的自动分区、压缩、保留策略)。 --- ## 2. 整体架构 ``` ┌──────────────────────────────────────────────────────────────────────┐ │ Trade Data Module (Bun + TS) │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Binance │ │ OKX │ │ Bybit │ ← 交易所 WebSocket 适配器 │ │ │ Adapter │ │ Adapter │ │ Adapter │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ └──────────────┼──────────────┘ │ │ ▼ │ │ ┌────────────────────────────────────────┐ │ │ │ 统一行情事件流(RxJS) │ │ │ │ ticker$ │ trade$ │ orderbook$ │ │ │ └──────────────────┬─────────────────────┘ │ │ ▼ │ │ ┌────────────────────────────────────────┐ │ │ │ K 线合成管道(时间桶算法) │ │ │ │ Trade → 1m → 5m → 15m → 1h → 4h → 1d │ │ │ └────────┬───────────────────┬───────────┘ │ │ ▼ ▼ │ │ ┌────────────────┐ ┌──────────────────┐ │ │ │ TimescaleDB │ │ Redis Pub/Sub │ │ │ │ @timescaledb/ │ │ (ioredis) │ │ │ │ typeorm 写入 │ │ 行情实时发布 │ │ │ └───────┬────────┘ └────────┬─────────┘ │ │ │ │ │ │ ┌───────▼────────┐ ┌───────▼─────────┐ │ │ │ 连续聚合视图 │ │ Python 策略引擎 │ │ │ │ (自动刷新) │ │ (消费实时行情) │ │ │ └────────────────┘ └─────────────────┘ │ │ │ │ ┌────────────────────────────────────────┐ │ │ │ TypeORM 实体(关系数据) │ │ │ │ Exchange / TradingPair / Symbol / │ │ │ │ DataSource / Subscription │ │ │ └────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────────────┘ ``` --- ## 3. 目录结构 ``` data/ ├── ARCHITECTURE.md # ← 本文件:技术架构说明 ├── package.json # 依赖与脚本 ├── tsconfig.json # TypeScript 配置 ├── bun.lock # Bun 依赖锁定文件 │ ├── run/ # 启动文件 │ ├── main.ts # 模块入口:配置加载 → DB 连接 → Redis 连接 → 适配器启动 → 优雅关闭 │ └── exchange.ts # 数据补全服务:读取 trading_pairs.last_backfill_time → 拉取缺失 K 线 → 批量写入 → 更新时间戳 │ ├── config/ # 中心化配置模块(目录) │ ├── index.ts # 配置加载与分组导出(pgsql / redis / logging) │ └── validators.ts # 零依赖运行时校验(env.yaml → EnvConfig) │ ├── db/ # 数据库层 │ ├── data-source.ts # TypeORM DataSource 配置 │ │ │ ├── entities/ # 所有 ORM 实体(关系 + 时序) │ │ ├── index.ts # 实体统一导出 │ │ ├── common.entity.ts # 公共基类(UUID 主键 + created_at/updated_at) │ │ ├── exchange.entity.ts # 交易所(binance/okx/bybit) │ │ ├── trading-pair.entity.ts # 交易对(BTCUSDT/ETHUSDT) │ │ └── kline.entity.ts # TimescaleDB K 线 hypertable(1m) │ │ │ └── migrations/ # TypeORM 迁移文件(自动生成) │ └── .gitkeep ├── exchanges/ # 交易所适配器(待实现) ├── pipeline/ # K 线合成管道(待实现) ├── publisher/ # Redis 数据发布(待实现) ├── types/ # 共享类型定义 ├── utils/ # 工具函数 └── tests/ # 测试 ``` --- ## 4. 核心模块设计 ### 4.1 配置模块 [`config/`](config/index.ts:1) ``` 加载流程: 项目根目录 env.yaml → parseYaml() 解析 → validateConfig() 零依赖校验 → 按职责分组导出: pgsql / redis / logging ``` | 导出对象 | 字段 | 来源 | 说明 | |----------|------|------|------| | `pgsql` | host, port, database, user, password, max, idleTimeoutMillis, connectionTimeoutMillis | `env.yaml` → 校验 + 硬编码 | pg Pool 配置,TypeORM DataSource 复用;`max`/`idleTimeoutMillis`/`connectionTimeoutMillis` 硬编码 | | `redis` | url, publishEnabled, channelPrefix, retryDelayBaseMs, maxRetries | `env.yaml` → 校验 + 硬编码 | ioredis 连接与 Pub/Sub 配置;`channelPrefix`/`retryDelayBaseMs`/`maxRetries` 硬编码 | | `logging` | level, nodeEnv, pretty | `env.yaml` → 校验 | Pino 日志配置;`pretty` 由 `node_env === "development"` 推导 | > **设计原则**:仅数据库连接、Redis 连接、日志级别等运维配置通过 `env.yaml` 暴露。配置源为项目根目录的 `env.yaml`,TypeScript/Python 模块共享同一份配置。 **校验定义位置**:[`config/validators.ts`](config/validators.ts) — 零依赖运行时校验(`assertString` / `assertPort` / `assertBoolean` / `assertEnum`),fail-fast 原则。 **配置分组详情**: ```typescript // —— pgsql: 数据库连接 —— export const pgsql = { host: rawConfig.db.host, // env.yaml default: "localhost" port: rawConfig.db.port, // env.yaml default: 5432 database: rawConfig.db.name, // env.yaml default: "trade" user: rawConfig.db.user, // env.yaml default: "trader" password: rawConfig.db.password, // env.yaml required max: 20, // 硬编码:连接池上限 idleTimeoutMillis: 30000, // 硬编码:空闲超时 30s connectionTimeoutMillis: 5000, // 硬编码:连接获取超时 5s } as const; // —— redis: 消息队列 —— export const redis = { url: rawConfig.redis.url, // env.yaml default: "redis://localhost:6379" publishEnabled: rawConfig.redis.publish_enabled, // env.yaml default: true channelPrefix: "trade", // 硬编码 retryDelayBaseMs: 1000, // 硬编码 maxRetries: 10, // 硬编码 } as const; // —— logging: 日志 —— export const logging = { level: rawConfig.logging.level, // env.yaml default: "debug" nodeEnv: rawConfig.logging.node_env, // env.yaml default: "development" pretty: rawConfig.logging.node_env === "development", } as const; ``` --- ### 4.2 TypeORM 数据源 [`db/data-source.ts`](db/data-source.ts) ```typescript // data/db/data-source.ts import { DataSource } from "typeorm"; import { pgsql } from "../config"; import * as entities from "./entities"; export const AppDataSource = new DataSource({ type: "postgres", host: pgsql.host, port: pgsql.port, database: pgsql.database, username: pgsql.user, password: pgsql.password, // 所有实体(关系 + 时序)通过 db/entities/index.ts 统一注册 entities: [...Object.values(entities)], synchronize: true, // 开发环境自动同步 schema migrations: [__dirname + "/migrations/*.{ts,js}"], extra: { max: pgsql.max, idleTimeoutMillis: pgsql.idleTimeoutMillis, connectionTimeoutMillis: pgsql.connectionTimeoutMillis, }, }); await AppDataSource.initialize(); ``` #### 实体注册原则 - **关系实体**(`Exchange`, `TradingPair`):继承 `CommonBaseEntity`(UUID 主键 + created_at/updated_at),标准 `@Entity()` 装饰器 - **时序实体**(`Kline`):不继承 `CommonBaseEntity`,使用 `@Hypertable()` + `@TimeColumn()` 自动创建 TimescaleDB hypertable - **统一入口**:所有实体通过 [`db/entities/index.ts`](db/entities/index.ts) 导出,`...Object.values(entities)` 自动注册 --- ### 4.3 TimescaleDB K 线实体 [`db/entities/klines/`](db/entities/klines/) #### [`kline.entity.ts`](db/entities/klines/kline.entity.ts) — 1m K 线 Hypertable ```typescript // data/db/entities/klines/kline.entity.ts import { Hypertable, TimeColumn, PartitionColumn, Compress, ContinuousAggregate, } from "@timescaledb/typeorm"; import { Entity, PrimaryColumn, Column, Unique, Index, CreateDateColumn, UpdateDateColumn, } from "typeorm"; /** * 1 分钟 K 线 Hypertable * * TimescaleDB 自动按 time 列分区(chunk_time_interval = 1 day), * 按 exchange 做空间分区(number_partitions = 4), * 关闭 7 天后自动启用列式压缩。 */ @Hypertable({ chunk_time_interval: "1 day", // 每个 chunk 含 1 天数据 partitioning_column: "exchange", // 空间分区列 number_partitions: 4, // 4 个空间分区 if_not_exists: true, }) @Compress({ segmentby: "exchange, symbol, interval", // 压缩分区键 orderby: "time DESC", // 压缩排序键 compress_after: "7 days", // 7 天后自动压缩 }) @Index(["exchange", "symbol", "interval", "time"], { unique: true }) export class Kline { @TimeColumn() // @timescaledb/typeorm: 标记为时间分区列 @PrimaryColumn("timestamptz") time!: Date; @PartitionColumn() // @timescaledb/typeorm: 标记为空间分区列 @Column("text") exchange!: string; @Column("text") symbol!: string; @Column("text") interval!: string; // "1m" // === OHLCV 价格数据(NUMERIC(20,8) 精度) === @Column("numeric", { precision: 20, scale: 8 }) open!: number; @Column("numeric", { precision: 20, scale: 8 }) high!: number; @Column("numeric", { precision: 20, scale: 8 }) low!: number; @Column("numeric", { precision: 20, scale: 8 }) close!: number; @Column("numeric", { precision: 20, scale: 8 }) volume!: number; // === 扩展字段 === @Column("numeric", { precision: 20, scale: 8, nullable: true }) quote_volume?: number; @Column("numeric", { precision: 20, scale: 8, nullable: true }) taker_buy_base_vol?: number; @Column("numeric", { precision: 20, scale: 8, nullable: true }) taker_buy_quote_vol?: number; @Column("integer", { nullable: true }) trade_count?: number; @Column("boolean", { default: true }) is_closed!: boolean; // === 审计字段 === @CreateDateColumn("timestamptz") created_at!: Date; @UpdateDateColumn("timestamptz") updated_at!: Date; } ``` > **@timescaledb/typeorm 关键装饰器**: > > | 装饰器 | 作用 | SQL 等价 | > |--------|------|----------| > | `@Hypertable()` | 将实体映射为 TimescaleDB hypertable | `SELECT create_hypertable(...)` | > | `@TimeColumn()` | 标记时间分区列 | `-- time 列` | > | `@PartitionColumn()` | 标记空间分区列 | `partitioning_column =>` | > | `@Compress()` | 配置自动压缩策略 | `ALTER TABLE SET (timescaledb.compress)` + `add_compression_policy()` | > | `@ContinuousAggregate()` | 声明连续聚合视图(见下文) | `CREATE MATERIALIZED VIEW ... WITH (timescaledb.continuous)` | #### [`trade.entity.ts`](db/entities/klines/trade.entity.ts) — 逐笔成交 Hypertable ```typescript // data/db/entities/klines/trade.entity.ts /** * 逐笔成交记录 Hypertable * * 存储交易所推送的每笔撮合成交,是 K 线合成器的数据源。 * 数据量极大(BTCUSDT 高峰 >100tps),保留策略设为 3 天。 */ @Hypertable({ chunk_time_interval: "1 hour", // 高频写入,chunk 更细 partitioning_column: "exchange", number_partitions: 4, if_not_exists: true, }) @Compress({ segmentby: "exchange, symbol", orderby: "trade_time DESC", compress_after: "1 day", // 1 天后即压缩 }) export class Trade { @TimeColumn() @PrimaryColumn("timestamptz") trade_time!: Date; @PartitionColumn() @Column("text") exchange!: string; @Column("text") symbol!: string; @Column("numeric", { precision: 20, scale: 8 }) price!: number; @Column("numeric", { precision: 20, scale: 8 }) amount!: number; // 成交数量(base 币种) @Column("numeric", { precision: 20, scale: 8 }) quote_amount!: number; // 成交额(quote 币种) @Column("boolean") is_buyer_maker!: boolean; // true = 主动卖出, false = 主动买入 @Column("bigint", { nullable: true }) trade_id?: number; // 交易所成交 ID @CreateDateColumn("timestamptz") created_at!: Date; } ``` #### 连续聚合声明(5m / 15m / 1h / 1d) `@timescaledb/typeorm` 支持通过实体类声明连续聚合,替代手动编写 SQL 物化视图: ```typescript // data/db/entities/klines/kline-1h.entity.ts(示意) import { ContinuousAggregate } from "@timescaledb/typeorm"; /** * 1h K 线连续聚合视图 * * 从 klines (1m) 表自动聚合,刷新策略:每 5 分钟刷新最近 1 天数据。 * 应用程序查询时自动路由到该视图,无需扫描 1m 表。 */ @ContinuousAggregate({ source: () => Kline, // 源实体(1m K 线) bucket_interval: "1 hour", // 聚合时间桶 refresh: { start_offset: "3 days", end_offset: "1 hour", // 留 1 小时给延迟数据 schedule_interval: "5 minutes", }, }) export class Kline1h { @TimeColumn() @PrimaryColumn("timestamptz") time!: Date; @Column("text") exchange!: string; @Column("text") symbol!: string; // 聚合字段(由 timescaledb 自动填充) open!: number; high!: number; low!: number; close!: number; volume!: number; quote_volume!: number; trade_count!: number; } ``` > **注意**:`@timescaledb/typeorm` 的 `@ContinuousAggregate` 装饰器目前处于实验阶段(v0.0.1)。生产环境中建议同时保留 `db/init-db/` 下的原始 SQL 建表脚本作为 fallback,通过 Docker 的 `/docker-entrypoint-initdb.d/` 自动执行。 --- ### 4.4 关系数据实体 [`db/entities/`](db/entities/) 这些实体由标准 TypeORM 管理,存储业务配置数据。 #### [`exchange.entity.ts`](db/entities/exchange.entity.ts) ```typescript // data/db/entities/exchange.entity.ts @Entity("exchanges") export class Exchange { @PrimaryGeneratedColumn("uuid") id!: string; @Column("varchar", { length: 50, unique: true }) name!: string; // "binance" | "okx" | "bybit" @Column("varchar", { length: 100 }) label!: string; // "Binance" | "OKX" | "Bybit" @Column("boolean", { default: true }) enabled!: boolean; // 是否启用该交易所 @Column("jsonb", { nullable: true }) config?: Record; // 交易所特定配置(费率、最小下单量等) } ``` #### [`trading-pair.entity.ts`](db/entities/trading-pair.entity.ts) ```typescript // data/db/entities/trading-pair.entity.ts @Entity("trading_pairs") @Index(["exchange_id", "symbol"], { unique: true }) export class TradingPair { @PrimaryGeneratedColumn("uuid") id!: string; @ManyToOne(() => Exchange) @JoinColumn({ name: "exchange_id" }) exchange!: Exchange; @Column("varchar", { length: 20 }) symbol!: string; // "BTCUSDT" @Column("varchar", { length: 10 }) base_asset!: string; // "BTC" @Column("varchar", { length: 10 }) quote_asset!: string; // "USDT" @Column("numeric", { precision: 20, scale: 8, nullable: true }) min_qty?: number; // 最小下单量 @Column("numeric", { precision: 20, scale: 8, nullable: true }) step_size?: number; // 下单步长 @Column("boolean", { default: true }) active!: boolean; // 是否订阅 } ``` #### [`symbol.entity.ts`](db/entities/symbol.entity.ts) ```typescript // data/db/entities/symbol.entity.ts @Entity("symbols") export class SymbolEntity { @PrimaryGeneratedColumn("uuid") id!: string; @Column("varchar", { length: 10, unique: true }) asset!: string; // "BTC" | "ETH" @Column("varchar", { length: 50, nullable: true }) name?: string; // "Bitcoin" | "Ethereum" @Column("boolean", { default: false }) is_stablecoin!: boolean; // 是否为稳定币 } ``` #### 实体 ER 关系 ``` ┌─────────────┐ ┌──────────────────┐ ┌─────────────┐ │ Exchange │──1:N──│ TradingPair │──N:1──│ SymbolEntity│ │ │ │ │ │ │ │ id (PK) │ │ id (PK) │ │ id (PK) │ │ name │ │ exchange_id (FK) │ │ asset │ │ label │ │ symbol │ │ name │ │ enabled │ │ base_asset │ │ is_stablecoin│ │ config │ │ quote_asset │ │ │ └─────────────┘ │ min_qty │ └─────────────┘ │ step_size │ │ active │ └──────────────────┘ ``` --- ### 4.5 交易所适配器 [`exchanges/`](exchanges/) ```typescript // data/exchanges/types.ts import type { Observable } from "rxjs"; /** 统一行情数据源接口 —— 所有交易所适配器必须实现 */ export interface MarketDataFeed { /** 交易所标识 */ readonly exchange: string; /** 建立 WebSocket 连接 */ connect(): Promise; /** 断开连接 */ disconnect(): Promise; /** 订阅实时 Ticker 流 */ subscribeTicker(symbols: string[]): Observable; /** 订阅逐笔成交流 */ subscribeTrade(symbols: string[]): Observable; /** 订阅订单簿深度 */ subscribeOrderbook(symbol: string, depth?: number): Observable; /** REST 拉取历史 K 线(用于补齐缺失数据) */ fetchKlines( symbol: string, interval: KlineInterval, startTime: number, endTime: number, limit?: number ): Promise; /** 连接状态 */ readonly connectionState: "disconnected" | "connecting" | "connected" | "error"; } ``` #### 基类设计 [`base.ts`](exchanges/base.ts) ```typescript // data/exchanges/base.ts export abstract class BaseExchangeAdapter implements MarketDataFeed { abstract readonly exchange: string; protected connectionState: "disconnected" | "connecting" | "connected" | "error" = "disconnected"; // 指数退避重连(所有子类复用) // 重连参数在各子类构造函数中注入,默认值: baseDelay=3000ms, maxAttempts=10 protected async reconnect(attempt: number, baseDelayMs = 3000): Promise { const delay = baseDelayMs * Math.pow(2, Math.min(attempt, 5)); logger.warn({ exchange: this.exchange, attempt, delay }, "WebSocket reconnecting"); await sleep(delay); await this.connect(); } abstract connect(): Promise; abstract disconnect(): Promise; abstract subscribeTicker(symbols: string[]): Observable; abstract subscribeTrade(symbols: string[]): Observable; abstract subscribeOrderbook(symbol: string, depth?: number): Observable; abstract fetchKlines( symbol: string, interval: KlineInterval, startTime: number, endTime: number, limit?: number ): Promise; } ``` **适配器实现矩阵**: | 交易所 | 适配器类 | WebSocket 库 | REST 库 | 备注 | |--------|---------|-------------|---------|------| | Binance | `BinanceAdapter` | `binance` (官方 SDK) | `ccxt` | 官方 WS 支持组合流(多 symbol 单连接),性能最优 | | OKX | `OKXAdapter` | `ccxt.pro` | `ccxt` | ccxt.pro 内置 WS 管理 | | Bybit | `BybitAdapter` | `ws` (裸库) | `ccxt` | Bybit V5 WebSocket 协议 | --- ### 4.6 K 线合成管道 [`pipeline/`](pipeline/) #### 时间桶算法(核心) ``` 每收到一条 Trade: 1. 计算桶索引: bucketIndex = floor(timestamp / intervalMs) 2. 若 bucketIndex ≠ 当前桶: a. emit 当前桶 K 线(已关闭) b. 创建新桶,O=H=L=C=price, V=amount 3. 若仍在当前桶: a. H = max(H, price) b. L = min(L, price) c. C = price d. V += amount ``` #### 多周期合成策略(混合方案) ``` Trade 流 ────► 1m 合成器(时间桶,数据源= Trade) │ ▼ 输出 1m K 线 ┌─────────────┐ │ 1m → 5m 聚合 │ ← 5 根 1m → 1 根 5m └──────┬──────┘ ▼ ┌──────────────┐ │ 5m → 15m 聚合 │ ← 3 根 5m → 1 根 15m └──────┬───────┘ ▼ ┌──────────────┐ │ 15m → 1h 聚合 │ ← 4 根 15m → 1 根 1h └──────────────┘ ``` > **设计理由**:1m 用 Trade 实时合成(精度最高),5m+ 从低周期聚合(计算量最小)。避免为每个周期维护独立的 Trade 缓存造成内存压力。 #### 核心代码骨架 ```typescript // data/pipeline/kline-synthesizer.ts import { Subject, interval } from "rxjs"; import type { Trade, Kline, KlineInterval } from "../types/market"; const INTERVAL_MS: Record = { "1m": 60_000, "5m": 300_000, "15m": 900_000, "30m": 1_800_000, "1h": 3_600_000, "4h": 14_400_000, "1d": 86_400_000, "1w": 604_800_000, }; interface Bucket { openTime: number; open: number; high: number; low: number; close: number; volume: number; tradeCount: number; isClosed: boolean; } export class KlineSynthesizer { private buckets = new Map(); /** * 输入一条 Trade,产出可能的闭合 K 线。 * 返回 null 表示当前桶未关闭,暂不产出。 */ synthesize( exchange: string, symbol: string, interval: KlineInterval, trade: Trade, ): Kline | null { const intervalMs = INTERVAL_MS[interval]; const bucketTime = Math.floor(trade.timestamp / intervalMs) * intervalMs; const key = `${exchange}:${symbol}:${interval}:${bucketTime}`; const existing = this.buckets.get(key); if (!existing) { this.buckets.set(key, { openTime: bucketTime, open: trade.price, high: trade.price, low: trade.price, close: trade.price, volume: trade.amount, tradeCount: 1, isClosed: false, }); return null; // 新桶第一条,不产出 } existing.high = Math.max(existing.high, trade.price); existing.low = Math.min(existing.low, trade.price); existing.close = trade.price; existing.volume += trade.amount; existing.tradeCount += 1; return null; } /** 定时关闭过期桶,返回已闭合 K 线列表 */ closeExpired(now: number): Kline[] { const closed: Kline[] = []; for (const [key, bucket] of this.buckets.entries()) { const intervalMs = INTERVAL_MS["1m"]; // 仅 1m 合成器有此方法 if (!bucket.isClosed && now >= bucket.openTime + intervalMs + 15000) { bucket.isClosed = true; closed.push(/* 构建 Kline 对象 */); this.buckets.delete(key); } } return closed; } /** 从 1m K 线聚合为高周期 K 线 */ static aggregate( klines: Kline[], targetInterval: KlineInterval, ): Kline[] { const intervalMs = INTERVAL_MS[targetInterval]; const groups = new Map(); for (const k of klines) { const bucketTime = Math.floor(k.openTime / intervalMs) * intervalMs; const group = groups.get(bucketTime) ?? []; group.push(k); groups.set(bucketTime, group); } return Array.from(groups.entries()).map(([bucketTime, group]) => ({ openTime: bucketTime, closeTime: bucketTime + intervalMs, open: group[0]!.open, high: Math.max(...group.map(k => k.high)), low: Math.min(...group.map(k => k.low)), close: group[group.length - 1]!.close, volume: group.reduce((sum, k) => sum + k.volume, 0), // ... })); } } ``` #### 数据清洗 [`cleaner.ts`](pipeline/cleaner.ts) | 清洗规则 | 触发条件 | 处理方式 | |----------|----------|----------| | 价格异常 | `|price - median| > 10 * MAD` | 丢弃该条 Trade | | 空成交量 | `volume === 0 && tradeCount === 0` | 用前一根 K 线的 Close 填充(平盘 K 线) | | 时间戳乱序 | `trade.timestamp < lastTimestamp` | 100ms 排序窗口内重排,超时则丢弃 | | 断线缺失 | 连续 5 分钟无数据 | 标记 `gap: true`,策略端可选择跳过 | --- ### 4.7 数据发布 [`publisher/`](publisher/) ```typescript // data/publisher/redis.ts import Redis from "ioredis"; import { redis as redisConfig } from "../../config"; export class RedisPublisher { private client: Redis; private readonly prefix: string; constructor() { this.prefix = redisConfig.channelPrefix; // "trade" this.client = new Redis(redisConfig.url, { retryStrategy: (times) => { if (times > redisConfig.maxRetries) return null; // 停止重试 return redisConfig.retryDelayBaseMs * Math.pow(2, times); }, }); } /** 发布实时 Ticker */ async publishTicker(exchange: string, symbol: string, data: Ticker): Promise { if (!redisConfig.publishEnabled) return; const channel = `${this.prefix}:market:ticker:${exchange}:${symbol}`; await this.client.publish(channel, JSON.stringify(data)); } /** 发布 K 线(增量 — 只推送最新一根的 OHLCV 变化) */ async publishKlineDelta( exchange: string, symbol: string, interval: string, delta: KlineDelta, ): Promise { if (!redisConfig.publishEnabled) return; const channel = `${this.prefix}:market:kline:${exchange}:${symbol}:${interval}`; await this.client.publish(channel, JSON.stringify(delta)); } } ``` **Redis Pub/Sub 频道命名规范**: | 频道模式 | 说明 | 消费方 | |----------|------|--------| | `trade:market:ticker:{exchange}:{symbol}` | 实时 Ticker | Python 策略引擎 | | `trade:market:kline:{exchange}:{symbol}:{interval}` | K 线增量更新 | Python 策略引擎 | | `trade:market:trade:{exchange}:{symbol}` | 逐笔成交 | Python 策略引擎 | | `trade:system:heartbeat:data` | 数据模块心跳 | 监控告警 | | `trade:system:error:data` | 数据模块异常 | 监控告警 | --- ### 4.8 数据补全服务 [`run/exchange.ts`](run/exchange.ts) 独立启动的数据补全服务,从 `trading_pairs` 表中读取每个交易对的 `last_backfill_time`,据此确定需要拉取的历史 K 线范围,补全完成后将 `last_backfill_time` 更新为最新时间点。 **核心机制 — 基于 `last_backfill_time` 的增量补全**: ``` trading_pairs 表: ┌────┬──────────┬──────────────────────┬──────────────────────┐ │ id │ symbol │ last_backfill_time │ kline_intervals │ ├────┼──────────┼──────────────────────┼──────────────────────┤ │ 1 │ BTCUSDT │ 2026-06-07 12:00:00 │ 1m,5m,15m,1h,4h,1d │ │ 2 │ ETHUSDT │ 2026-06-08 08:00:00 │ 1m,5m,1h,1d │ └────┴──────────┴──────────────────────┴──────────────────────┘ 补全任务生成: BTCUSDT → [(1m, 06/07 12:00 → now), (5m, 06/07 12:00 → now), ...] ETHUSDT → [(1m, 06/08 08:00 → now), (5m, 06/08 08:00 → now), ...] ``` - `last_backfill_time` 初始值为 `1970-01-01T00:00:00Z`(epoch 起点),新交易对自动触发全量拉取 - 每次补全完成后,更新为本次实际拉取到的最后一条 K 线时间 - 下次运行时自动从上次结束位置继续,无重复拉取 **使用场景**: | 场景 | 触发方式 | 说明 | |------|----------|------| | **定期增量补全** | cron 定时触发 | 每日/每小时补齐最新数据 | | **首次上线初始化** | 手动执行 | 新交易对 `last_backfill_time` 为 epoch,自动拉全量历史 | | **定点修复** | `--start` / `--end` 覆盖 | 修复特定时间段的缺失数据 | | **补全后验证** | `--dry-run` | 仅展示需拉取的任务范围,不实际请求 | **命令行参数**: ```bash # 全量模式:为所有 active 交易对执行增量补全 bun run run/exchange.ts --concurrency 2 # 指定交易对: bun run run/exchange.ts --symbols BTCUSDT,ETHUSDT # 手动覆盖时间范围(忽略 last_backfill_time): bun run run/exchange.ts \ --symbols BTCUSDT \ --start "2026-06-01T00:00:00Z" \ --end "2026-06-08T00:00:00Z" # 仅检测不拉取: bun run run/exchange.ts --dry-run ``` | 参数 | 默认值 | 说明 | |------|--------|------| | `--exchange` | (从 DB 读取) | 限定交易所,不填则为所有启用交易所 | | `--symbols` | (从 DB 读取所有 active) | 限定交易对列表,逗号分隔 | | `--intervals` | (从 DB 读取 `kline_intervals`) | K 线周期,逗号分隔 | | `--start` | `last_backfill_time`(不低于 7 天前) | 补全起始时间 (ISO 格式);不填则使用 DB 中的 `last_backfill_time` | | `--end` | `Date.now()` | 补全结束时间 (ISO 格式) | | `--concurrency` | `2` | 并发任务数 | | `--batch-size` | `500` | 单次 REST 请求最大 K 线条数 | | `--dry-run` | `false` | 仅列出任务范围,不拉取不写入 | **执行流程**: ``` 1. 查询 trading_pairs 表(JOIN exchanges),获取 active=true 且 exchange.enabled=true 的交易对 2. 为每个交易对 × 每个 kline_interval 生成一个 BackfillTask: - startTime = --start ?? last_backfill_time(若 last_backfill_time 为 epoch 则兜底为 now-7d) - endTime = --end ?? now - 若 startTime >= endTime → 跳过(已是最新) 3. 按 exchange 分组,创建对应适配器实例 4. Semaphore 并发执行(默认 2): a. 按 batch-size 分段切分时间范围 b. 逐段调用适配器 fetchKlines() → 写入 klines 表(UPSERT) c. 记录本次拉取的最后一条 K 线时间 5. 所有任务完成后,更新每个交易对的 last_backfill_time ``` **并发策略**: - 不同(symbol, interval)任务之间并行执行 - 同一任务内部的多次分页请求串行执行(受 REST API 限频约束) - 单个任务失败不影响其他任务,失败数记录到最终统计 **last_backfill_time 更新逻辑**: ``` 任务完成后: pairLastTimes[pairId] = max(pairLastTimes[pairId], 本次拉取最后一条 K 线的 openTime) 最后统一: UPDATE trading_pairs SET last_backfill_time = pairLastTimes[id] ``` **注意事项**: - 未指定 `--symbols` 且非 `--dry-run` 时,走"全量增量"模式,覆盖所有 active 交易对 - 指定 `--start` 时不使用 `last_backfill_time`,但仍会更新 `last_backfill_time` 为实际拉取时间 - `--dry-run` 模式下不更新 `last_backfill_time` - 依赖交易所 REST API 限频,当前硬编码每次分页间隔 200ms(Binance) --- ## 5. 数据流生命周期 ``` 交易所 WebSocket │ ┌────────────┼────────────┐ ▼ ▼ ▼ Binance WS OKX WS Bybit WS │ │ │ └────────────┼────────────┘ │ ┌──────▼──────┐ │ RxJS 流 │ │ ticker$ │ │ trade$ │ │ orderbook$ │ └──────┬──────┘ │ ┌────────────┼────────────┐ ▼ ▼ ▼ K线合成器 数据清洗 格式转换 (时间桶) (异常过滤) (标准化) │ │ │ └────────────┼────────────┘ │ ┌────────────┼────────────┐ ▼ ▼ ┌──────────────────┐ ┌──────────────────┐ │ @timescaledb/ │ │ Redis Pub/Sub │ │ typeorm 批量写入 │ │ 实时行情发布 │ │ │ │ │ │ · Kline (1m) │ │ · ticker channel │ │ · Trade (逐笔) │ │ · kline channel │ │ │ │ · trade channel │ └────────┬──────────┘ └────────┬─────────┘ │ │ ▼ ▼ ┌──────────────────┐ ┌──────────────────┐ │ TimescaleDB │ │ Python 策略引擎 │ │ 连续聚合刷新 │ │ (消费 + 决策) │ │ · klines_5m │ │ │ │ · klines_15m │ │ │ │ · klines_1h │ │ │ │ · klines_1d │ │ │ └──────────────────┘ └──────────────────┘ ``` ### 启动时序 入口文件:[`run/main.ts`](run/main.ts) ``` 1. 加载配置(config/index.ts → 读取 env.yaml → 零依赖校验) 2. 初始化 Pino 日志 3. TypeORM DataSource.initialize() ├── 连接 PostgreSQL ├── 自动创建 TimescaleDB 扩展(如不存在) ├── 执行 Migration(生产环境) └── 注册 @timescaledb/typeorm 订阅者 4. 初始化 Redis 连接(ioredis) 5. 从数据库加载 TradingPair 列表(active = true) 6. 启动交易所适配器(并行) ├── connect() → WebSocket 连接 └── subscribeTicker/Trade(symbols) → RxJS Observable 7. 启动管道 ├── KlineSynthesizer 订阅 trade$ ├── DataCleaner 订阅合成输出 └── TimescaleDBWriter + RedisPublisher 订阅清洗输出 8. 注册 SIGTERM/SIGINT 优雅关闭 ``` --- ## 6. TypeORM + TimescaleDB 集成细节 ### 6.1 写入路径选择 | 场景 | 方案 | 原因 | |------|------|------| | **关系数据 CRUD**(交易对配置、交易所) | TypeORM Repository API | 标准 ORM 操作,事务支持,类型安全 | | **K 线批量写入**(高频) | TypeORM Repository `upsert()` + 批量 | TypeORM 的 `upsert` 方法支持 `ON CONFLICT DO UPDATE` | | **K 线海量写入**(极端场景) | 裸 `pg` Pool + `INSERT ... ON CONFLICT` | 绕过 ORM 开销,直接参数化 SQL,批量 500 条/次 | ```typescript // 方案 A: TypeORM Repository upsert(推荐日常使用) const klineRepo = AppDataSource.getRepository(Kline); await klineRepo.upsert(batchRecords, { conflictPaths: ["time", "exchange", "symbol", "interval"], skipUpdateIfNoValuesChanged: true, }); // 方案 B: 裸 pg 批量写入(极端性能场景) import { Pool } from "pg"; const pool = new Pool(pgsql); await pool.query(` INSERT INTO klines (...) VALUES ${values} ON CONFLICT (time, exchange, symbol, interval) DO UPDATE SET high = GREATEST(klines.high, EXCLUDED.high), low = LEAST(klines.low, EXCLUDED.low), close = EXCLUDED.close, volume = klines.volume + EXCLUDED.volume, updated_at = NOW() `, params); ``` ### 6.2 批量写入缓冲 ```typescript // data/pipeline/writer.ts export class KlineWriter { private buffer: Kline[] = []; private flushTimer: ReturnType; constructor( private readonly repo: Repository, private readonly batchSize = 500, private readonly flushIntervalMs = 1000, ) { // 定时刷新:即使未达阈值,也保证最大延迟 ≤ flushIntervalMs this.flushTimer = setInterval(() => this.flush(), this.flushIntervalMs); } async write(kline: Kline): Promise { this.buffer.push(kline); if (this.buffer.length >= this.batchSize) { await this.flush(); } } private async flush(): Promise { if (this.buffer.length === 0) return; const batch = this.buffer.splice(0, this.batchSize); await this.repo.upsert(batch, { conflictPaths: ["time", "exchange", "symbol", "interval"], }); logger.debug({ count: batch.length }, "Kline batch flushed"); } async destroy(): Promise { clearInterval(this.flushTimer); await this.flush(); // 最后刷新残留数据 } } ``` ### 6.3 TimescaleDB 特定配置 通过 `@timescaledb/typeorm` 在 DataSource 初始化时自动设置: ```typescript // db/data-source.ts 初始化后执行 await AppDataSource.initialize(); // @timescaledb/typeorm 会自动: // 1. CREATE EXTENSION IF NOT EXISTS timescaledb; // 2. 为 @Hypertable() 实体调用 create_hypertable() // 3. 为 @Compress() 实体调用 add_compression_policy() // 4. 为 @ContinuousAggregate() 实体创建物化视图 + 刷新策略 ``` 如果 `@timescaledb/typeorm` 的自动迁移不稳定,推荐 fallback 方案:在 `docker-compose.yml` 中挂载初始化脚本到 `db/init-db/`,容器启动时自动执行原始 SQL。 --- ## 7. 配置管理策略 ### 7.1 配置文件 项目根目录 [`env.yaml`](../../env.yaml) 为统一环境配置源,TypeScript(`data/config/`)和 Python 模块共享同一份 YAML。 ``` / ├── env.yaml ← 统一配置源(YAML,TS/Python 共享) └── data/ └── config/ ├── index.ts ← 读取 env.yaml → 校验 → 导出分组配置 └── validators.ts ← 零依赖运行时校验(assertString / assertPort / assertEnum) ``` ### 7.2 env.yaml 配置段 ```yaml # --- TimescaleDB / PostgreSQL --- db: host: localhost port: 5432 name: trade user: trader password: fucketh # --- Redis --- redis: url: redis://localhost:6379 publish_enabled: true # --- 日志 --- logging: level: debug # trace / debug / info / warn / error / fatal node_env: development # development / production / test ``` ### 7.3 使用方式 ```typescript // 任何模块中导入配置(所有导出对象均为强类型 as const) import { pgsql, redis, logging } from "../config"; // TypeORM DataSource const ds = new DataSource({ type: "postgres", ...pgsql }); // Redis 客户端 const redisClient = new Redis(redis.url); // 环境判断 if (logging.nodeEnv === "development") { logger.info("Config loaded"); } ``` --- ## 8. 日志与可观测性 ### 8.1 Pino 日志配置 ```typescript // data/logger.ts import pino from "pino"; import { logging } from "../config"; export const logger = pino({ level: logging.level, // 开发环境:使用 pino-pretty 彩色输出 // 生产环境:JSON 格式,便于 ELK / Loki 采集 ...(logging.pretty ? { transport: { target: "pino-pretty", options: { colorize: true } } } : {}), // 自动注入模块名 base: { module: "trade-data" }, // 序列化 Error 对象 serializers: { err: pino.stdSerializers.err, }, }); ``` ### 8.2 日志规范 | 级别 | 使用场景 | 示例 | |------|----------|------| | `trace` | 每条 Trade 的详细处理过程 | 开发调试用,生产关闭 | | `debug` | 批量写入、WS 心跳 | `"Kline batch flushed: count=500"` | | `info` | 模块启动/停止、连接建立 | `"Binance WS connected: symbols=15"` | | `warn` | 重连、数据异常、限频触发 | `"WS reconnecting: attempt=3, delay=12000ms"` | | `error` | 写入失败、WS 不可恢复 | `"TimescaleDB write failed: connection refused"` | | `fatal` | 进程即将退出 | `"Unrecoverable error, shutting down"` | > **安全原则**:严禁在日志中记录 API Key、Secret、数据库密码等敏感信息。使用 `logger.info({ apiKey: "***" })` 脱敏。 --- ## 9. 错误处理与容错 ### 9.1 错误分类与策略 | 错误类型 | 处理策略 | 恢复方式 | |----------|----------|----------| | **WebSocket 断线** | 指数退避重连(基数 3s,上限 10 次) | 自动重连 → 重新订阅 → 补齐缺失数据 | | **数据库写入失败** | 缓冲重试 3 次 → 丢弃并告警 | 下一批数据正常写入 | | **数据库连接断开** | pg Pool 自动重连 | TypeORM DataSource 重建 | | **Redis 发布失败** | 静默丢弃(发布是非关键路径) | 下一 tick 继续发布 | | **配置校验失败** | 进程退出(fail-fast) | 修正 .env 后重启 | | **交易所 API 限频** | 令牌桶限流,429 响应冷却 60s | 自动恢复 | ### 9.2 优雅关闭(Graceful Shutdown) ```typescript // data/run/main.ts async function shutdown(signal: string): Promise { logger.info({ signal }, "Shutting down"); // 1. 停止 WebSocket 连接(停止接收新数据) await Promise.all(adapters.map(a => a.disconnect())); // 2. 刷新 K 线写入缓冲区 await klineWriter.destroy(); // 3. 关闭 TypeORM DataSource await AppDataSource.destroy(); // 4. 关闭 Redis await redisPublisher.disconnect(); logger.info("Shutdown complete"); process.exit(0); } process.on("SIGTERM", () => shutdown("SIGTERM")); process.on("SIGINT", () => shutdown("SIGINT")); ``` ### 9.3 重试工具 [`utils/retry.ts`](utils/retry.ts) ```typescript // data/utils/retry.ts export async function withRetry( fn: () => Promise, options: { maxAttempts?: number; baseDelayMs?: number; onRetry?: (attempt: number, error: Error) => void; } = {}, ): Promise { const { maxAttempts = 3, baseDelayMs = 1000, onRetry } = options; for (let attempt = 1; attempt <= maxAttempts; attempt++) { try { return await fn(); } catch (err) { if (attempt === maxAttempts) throw err; const delay = baseDelayMs * Math.pow(2, attempt - 1); onRetry?.(attempt, err as Error); logger.warn({ attempt, delay, err }, "Retrying operation"); await sleep(delay); } } throw new Error("Unreachable"); } ``` --- ## 10. 性能考量 ### 10.1 关键性能指标 | 指标 | 目标 | 说明 | |------|------|------| | **单笔 Trade 处理延迟** | < 1 μs | 时间桶 HashMap 查找 + 更新 | | **WebSocket 消息吞吐** | > 10,000 msg/s | 单进程处理全部订阅对 | | **批量写入延迟** | < 100 ms | 500 条批量 UPSERT | | **内存占用(10 symbol × 5 周期)** | < 50 MB | 时间桶 + RxJS 缓存 | | **进程启动时间** | < 3 s | Bun 启动 + TypeORM 初始化 | ### 10.2 优化策略 | 策略 | 实现 | 收益 | |------|------|------| | **批量写入** | 500 条 / 1s 批量刷新 | 减少 99% DB 连接开销 | | **TimescaleDB 压缩** | 7 天后自动列式压缩 | 存储减少 90%+ | | **连续聚合** | 高周期从低周期视图读取 | 查询快 100×(1h K 线无需扫描 1m 表) | | **连接池复用** | TypeORM DataSource 管理 20 连接 | 防止连接数暴涨 | | **RxJS 操作符优化** | `share()` 多播 + `bufferTime()` 批量 | 避免重复计算 | | **JSON 序列化** | 生产环境考虑 MessagePack | 体积减少 30-50%,序列化快 2× | ### 10.3 磁盘空间估算 | 数据量(1m K 线) | 未压缩 | 压缩后(92%) | |-------------------|--------|-------------| | 10 币种 × 1 年 | ~945 MB | ~75 MB | | 50 币种 × 1 年 | ~4.7 GB | ~375 MB | | 200 币种 × 1 年 | ~18.9 GB | ~1.5 GB | --- ## 11. 开发工作流 ### 11.1 本地开发 ```bash # 1. 启动基础服务 docker compose up -d timescaledb # 2. 安装依赖 cd data && bun install # 3. 配置环境 # 编辑项目根目录 env.yaml(如不存在则创建) # 4. 启动数据模块 bun run run/main.ts # 5. 运行测试 bun test ``` ### 11.2 新增交易所适配器 ```bash # 1. 创建适配器文件 touch exchanges/new-exchange.ts # 2. 实现 MarketDataFeed 接口 # 3. 在 exchanges/types.ts 注册 # 4. 编写测试 tests/exchanges/new-exchange.test.ts # 5. 如需在数据库中配置,插入 exchanges 表和 trading_pairs 表 ``` ### 11.3 数据库迁移 ```bash # 生成迁移文件(开发环境) bunx typeorm migration:generate db/migrations/AddNewColumn -d db/data-source.ts # 执行迁移(生产环境) bunx typeorm migration:run -d db/data-source.ts # 回滚最近一次迁移 bunx typeorm migration:revert -d db/data-source.ts ``` ### 11.4 目录与文件命名规范 | 规范 | 说明 | |------|------| | 文件名 | `kebab-case.ts` | | 类名 | `PascalCase` | | 函数/变量 | `camelCase` | | 常量 | `UPPER_SNAKE_CASE` | | 类型/接口 | `PascalCase` | | 测试文件 | `*.test.ts`(与源文件同目录或 `tests/` 下镜像结构) | ### 11.5 数据补全 ```bash # 增量模式:为所有 active 交易对补齐最新数据 bun run run/exchange.ts --concurrency 2 # 仅检测需补全的任务范围 bun run run/exchange.ts --dry-run # 指定交易对增量补全 bun run run/exchange.ts --symbols BTCUSDT,ETHUSDT # 首次上线:拉取最近 90 天全量历史 bun run run/exchange.ts \ --symbols BTCUSDT \ --start "$(date -u -v-90d '+%Y-%m-%dT%H:%M:%SZ')" \ --intervals 1m,5m,15m,1h,4h,1d \ --concurrency 1 # cron 定时任务(每小时执行) 0 * * * * cd /app && bun run data/run/exchange.ts --concurrency 2 >> /var/log/backfill.log 2>&1 ``` --- ## 12. 风险提示 > ⚠️ **重要声明**:本数据模块为量化交易系统的技术基础设施,仅提供数据采集、存储与发布功能。任何基于本系统构建的交易策略,其盈亏风险由使用者自行承担。本系统作者不提供任何形式的投资建议,不对策略收益做任何承诺。 ### 技术风险 | 风险 | 影响 | 缓解措施 | |------|------|----------| | **交易所 API 变更** | 数据断流 | 适配器模式隔离,单个交易所变更不影响其他 | | **WebSocket 数据丢失** | K 线不完整 | REST 补拉机制 + gap 标记 | | **数据库写入瓶颈** | 数据积压 | 批量写入 + 连接池 + 监控告警 | | **Redis 宕机** | 策略引擎收不到实时数据 | Python 侧增加 TimescaleDB 直读 fallback | | **时钟偏差** | K 线时间戳错位 | NTP 时钟同步 + UTC 统一对齐 | ### 实盘接入前置条件 1. ✅ 模拟盘环境连续运行 ≥ 2 周无致命错误 2. ✅ 核心模块单元测试覆盖率 ≥ 80% 3. ✅ 数据库写入延迟 p99 < 200ms 4. ✅ WebSocket 断线自动恢复成功率 ≥ 99% 5. ✅ 日志采集与告警链路就绪 6. ✅ 资金隔离:不同策略使用独立子账户 7. ✅ 风控模块在 Python 侧就绪(熔断、仓位限制、最大回撤) --- ## 附录 A:docker-compose.yml 数据库配置参考 ```yaml # docker-compose.yml(项目根目录) services: timescaledb: image: timescale/timescaledb-ha:pg17.10-ts2.27.1 container_name: trade-timescaledb restart: unless-stopped ports: - "5432:5432" environment: POSTGRES_DB: trade POSTGRES_USER: trader POSTGRES_PASSWORD: ${DB_PASSWORD:-changeme} volumes: - ./db/pgsql:/var/lib/postgresql # 数据持久化 - ./data/init-db:/docker-entrypoint-initdb.d # 自动执行建表 SQL(fallback) command: > -c shared_buffers=1GB -c effective_cache_size=3GB -c maintenance_work_mem=256MB -c work_mem=64MB -c wal_buffers=64MB -c random_page_cost=1.1 -c effective_io_concurrency=200 -c max_connections=50 healthcheck: test: ["CMD-SHELL", "pg_isready -U trader -d trade"] interval: 10s timeout: 5s retries: 5 ``` ## 附录 B:核心依赖版本锁定 | 包 | 版本 | 说明 | |----|------|------| | `typescript` | ^6.0 | 语言编译器 | | `bun` | ≥1.3 | 运行时 | | `typeorm` | ^1.0 | 关系数据 ORM | | `@timescaledb/typeorm` | ^0.0.1 | TimescaleDB TypeORM 扩展(实验性) | | `pg` | ^8.21 | PostgreSQL 原生驱动 | | `yaml` | ^2.9 | YAML 解析(env.yaml) | | `pino` | ^10.3 | 日志 | | `ioredis` | ^5.11 | Redis 客户端 | | `ccxt` | ^4.5 | 统一交易所 REST API | | `binance` | ^3.5 | Binance 官方 WebSocket SDK | | `ws` | ^8.21 | 通用 WebSocket 客户端 |