chore: 清偿技术债务 P2 区 #8 #9 #11 #12 #13 #14

- exchanges/index.ts 拆分为 rest-registry + ws-manager,降级为 re-export hub
- 移除 fetchMarkets() 及 MarketInfo 类型(交易对人工筛选,无需自动注册)
- 移除 ConnectionState 死代码
- aggregate.ts: console.log → logger.debug 统一日志
- 删除 data/test.ts(SDK 临时调试脚本)
- ARCHITECTURE.md 更新反映当前简化架构
- #12 写入缓冲暂不实施:8 交易对瞬时 8 tps,无压力
This commit is contained in:
Rekey
2026-06-18 20:33:23 +08:00
parent 34632b6c33
commit 0a6198b0a2
14 changed files with 244 additions and 758 deletions
+1 -1
View File
@@ -61,7 +61,7 @@ data/ engine/
- ✅ TS 数据模块:配置加载、TypeORM 实体、Binance REST K 线拉取+UPSERT、连续聚合刷新。
- ❌ 未实现:WebSocket 行情、K 线合成管道、Redis 发布、策略管理器、信号总线、回测引擎、参数优化器、风控、交易执行、API 网关。
- ❌ 无测试、无 CI。
- `data/run/main.ts` 不存在`dev`/`start` 脚本指向未创建文件
- ✅ WebSocket 实时行情已实现(`run/start.ts``dev`/`start`/`ws` 脚本指向该入口
## 注意事项
+93 -491
View File
@@ -14,12 +14,10 @@
4. [核心模块设计](#4-核心模块设计)
- [4.1 配置模块 (`config/`)](#41-配置模块-config)
- [4.2 TypeORM 数据源 (`db/`)](#42-typeorm-数据源-db)
- [4.3 TimescaleDB K 线实体 (`db/entities/klines/`)](#43-timescaledb-k-线实体-dbentitiesklines)
- [4.3 TimescaleDB K 线实体](#43-timescaledb-k-线实体)
- [4.4 关系数据实体 (`db/entities/`)](#44-关系数据实体-dbentities)
- [4.5 交易所适配器 (`exchanges/`)](#45-交易所适配器-exchanges)
- [4.6 K 线合成管道 (`pipeline/`)](#46-k-线合成管道-pipeline)
- [4.7 数据发布 (`publisher/`)](#47-数据发布-publisher)
- [4.8 数据补全服务 (`run/exchange.ts`)](#48-数据补全服务-runexchangets)
- [4.6 数据补全服务 (`run/exchange.ts`)](#46-数据补全服务-runexchangets)
5. [数据流生命周期](#5-数据流生命周期)
6. [TypeORM + TimescaleDB 集成细节](#6-typeorm--timescaledb-集成细节)
7. [配置管理策略](#7-配置管理策略)
@@ -89,43 +87,34 @@
## 2. 整体架构
```
┌──────────────────────────────────────────────────────────────────────
Trade Data Module (Bun + TS) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ │ Binance │ │ OKX │ │ Bybit │ ← 交易所 WebSocket 适配器
│ │ Adapter │ │ Adapter │ │ Adapter
│ └─────────┘ └─────────┘ └────┬─────┘
└────────────────────────────
┌────────────────────────────────────────┐
│ 统一行情事件流(RxJS) │
ticker$ │ trade$ │ orderbook$ │
└──────────────────┬─────────────────────┘
┌────────────────────────────────────────┐
│ K 线合成管道(时间桶算法) │
│ │ Trade → 1m → 5m → 15m → 1h → 4h → 1d │ │
└────────┬───────────────────┬───────────┘
▼ ▼
────────────────┐ ┌──────────────────
│ TimescaleDB Redis Pub/Sub
│ @timescaledb/ │ │ (ioredis) │
│ │ typeorm 写入 行情实时发布
└───────┬────────┘ └────────┬─────────┘
│ │
┌───────▼────────┐ ┌───────▼─────────┐
│ │ 连续聚合视图 │ │ Python 策略引擎 │ │
│ │ (自动刷新) │ │ (消费实时行情) │ │
│ └────────────────┘ └─────────────────┘ │
│ │
│ ┌────────────────────────────────────────┐ │
│ │ TypeORM 实体(关系数据) │ │
│ │ Exchange / TradingPair / Symbol / │ │
│ │ DataSource / Subscription │ │
│ └────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────┐
│ Trade Data Module (Bun + TS) │
│ │
│ ┌──────────────────┐ ┌──────────────────┐
│ │ Binance REST │── backfill ──────→│ TimescaleDB │
│ │ (官方 SDK) │ Klines (1m) │
│ └──────────────────┘ └────────┬─────────┘
────────────────── Bus Events
│ Binance WS │── kline:update ────→ upsert─┘
│ (官方 SDK) │ ↓
└────────┬─────────┘ kline:saved ────→ 连续聚合刷新
│ pair:ready ← backfill 完成后触发
│ ws:disconnected / ws:connected / ws:stale
┌────────┴──────────────────────────┐
│ │ run/start.ts(编排层)
│ 注册 bus 监听 → backfillKlines() │
→ WS 订阅 → 入库 → 聚合刷新 │
───────────────────────────────────┘
┌───────────────────────────────────┐
│ │ TypeORM 实体(关系数据) │ │
│ Exchange / TradingPair │
└───────────────────────────────────┘
└──────────────────────────────────────────────────────────────────┘
```
---
@@ -140,7 +129,7 @@ data/
├── bun.lock # Bun 依赖锁定文件
├── run/ # 启动文件
│ ├── main.ts # 模块入口:配置加载 → DB 连接 → Redis 连接 → 适配器启动 → 优雅关闭
│ ├── start.ts # 模块入口:注册 bus 事件监听 → 回补 → WS 订阅 → 入库 → 聚合刷新 → 优雅关闭
│ └── exchange.ts # 数据补全服务:读取 trading_pairs.last_backfill_time → 拉取缺失 K 线 → 批量写入 → 更新时间戳
├── config/ # 中心化配置模块(目录)
@@ -160,8 +149,6 @@ data/
│ └── migrations/ # TypeORM 迁移文件(自动生成)
│ └── .gitkeep
├── exchanges/ # 交易所适配器(待实现)
├── pipeline/ # K 线合成管道(待实现)
├── publisher/ # Redis 数据发布(待实现)
├── types/ # 共享类型定义
├── utils/ # 工具函数
└── tests/ # 测试
@@ -257,12 +244,14 @@ await AppDataSource.initialize();
---
### 4.3 TimescaleDB K 线实体 [`db/entities/klines/`](db/entities/klines/)
### 4.3 TimescaleDB K 线实体
#### [`kline.entity.ts`](db/entities/klines/kline.entity.ts) — 1m K 线 Hypertable
> **状态:** K 线实体仅有 `kline.entity.ts`1m),位于 `db/entities/` 扁平目录下。本模块不涉及 1m 以上周期数据的读写,高周期 K 线(5m / 15m / 1h / 4h / 1d)由 `run/build_aggregates_sql.ts` 通过应用层 SQL 连续聚合视图刷新,不依赖 TimescaleDB 的 `@ContinuousAggregate` 自动聚合。
#### `kline.entity.ts` — 1m K 线 Hypertable
```typescript
// data/db/entities/klines/kline.entity.ts
// data/db/entities/kline.entity.ts
import {
Hypertable,
TimeColumn,
@@ -363,107 +352,12 @@ export class Kline {
> | `@TimeColumn()` | 标记时间分区列 | `-- time 列` |
> | `@PartitionColumn()` | 标记空间分区列 | `partitioning_column =>` |
> | `@Compress()` | 配置自动压缩策略 | `ALTER TABLE SET (timescaledb.compress)` + `add_compression_policy()` |
> | `@ContinuousAggregate()` | 声明连续聚合视图(见下文) | `CREATE MATERIALIZED VIEW ... WITH (timescaledb.continuous)` |
#### [`trade.entity.ts`](db/entities/klines/trade.entity.ts) — 逐笔成交 Hypertable
```typescript
// data/db/entities/klines/trade.entity.ts
/**
* 逐笔成交记录 Hypertable
*
* 存储交易所推送的每笔撮合成交,是 K 线合成器的数据源。
* 数据量极大(BTCUSDT 高峰 >100tps),保留策略设为 3 天。
*/
@Hypertable({
chunk_time_interval: "1 hour", // 高频写入,chunk 更细
partitioning_column: "exchange",
number_partitions: 4,
if_not_exists: true,
})
@Compress({
segmentby: "exchange, symbol",
orderby: "trade_time DESC",
compress_after: "1 day", // 1 天后即压缩
})
export class Trade {
@TimeColumn()
@PrimaryColumn("timestamptz")
trade_time!: Date;
@PartitionColumn()
@Column("text")
exchange!: string;
@Column("text")
symbol!: string;
@Column("numeric", { precision: 20, scale: 8 })
price!: number;
@Column("numeric", { precision: 20, scale: 8 })
amount!: number; // 成交数量(base 币种)
@Column("numeric", { precision: 20, scale: 8 })
quote_amount!: number; // 成交额(quote 币种)
@Column("boolean")
is_buyer_maker!: boolean; // true = 主动卖出, false = 主动买入
@Column("bigint", { nullable: true })
trade_id?: number; // 交易所成交 ID
@CreateDateColumn("timestamptz")
created_at!: Date;
}
```
#### 连续聚合声明(5m / 15m / 1h / 1d
`@timescaledb/typeorm` 支持通过实体类声明连续聚合,替代手动编写 SQL 物化视图:
```typescript
// data/db/entities/klines/kline-1h.entity.ts(示意)
import { ContinuousAggregate } from "@timescaledb/typeorm";
/**
* 1h K 线连续聚合视图
*
* 从 klines (1m) 表自动聚合,刷新策略:每 5 分钟刷新最近 1 天数据。
* 应用程序查询时自动路由到该视图,无需扫描 1m 表。
*/
@ContinuousAggregate({
source: () => Kline, // 源实体(1m K 线)
bucket_interval: "1 hour", // 聚合时间桶
refresh: {
start_offset: "3 days",
end_offset: "1 hour", // 留 1 小时给延迟数据
schedule_interval: "5 minutes",
},
})
export class Kline1h {
@TimeColumn()
@PrimaryColumn("timestamptz")
time!: Date;
@Column("text") exchange!: string;
@Column("text") symbol!: string;
// 聚合字段(由 timescaledb 自动填充)
open!: number;
high!: number;
low!: number;
close!: number;
volume!: number;
quote_volume!: number;
trade_count!: number;
}
```
> **注意**`@timescaledb/typeorm` 的 `@ContinuousAggregate` 装饰器目前处于实验阶段(v0.0.1)。生产环境中建议同时保留 `db/init-db/` 下的原始 SQL 建表脚本作为 fallback,通过 Docker 的 `/docker-entrypoint-initdb.d/` 自动执行。
> **本模块仅 `kline.entity.ts`1m)一个 K 线实体。** 高周期 K 线(5m / 15m / 1h / 4h / 1d)由 `run/build_aggregates_sql.ts` 通过应用层 SQL 刷新连续聚合视图(`klines_5m` / `klines_15m` 等),不创建独立实体,刷新触发由 `run/start.ts` 中 `kline:saved` 事件驱动。
---
### 4.4 关系数据实体 [`db/entities/`](db/entities/)
这些实体由标准 TypeORM 管理,存储业务配置数据。
@@ -566,8 +460,10 @@ export class SymbolEntity {
### 4.5 交易所适配器 [`exchanges/`](exchanges/)
> **状态:远期设计。** `MarketDataFeed` 接口仅为架构示意,`types/base.ts` 中已移除。当前未实现任何适配器的完整接口,WS 直接经 `exchanges/binance/ws.ts` 推送 bus 事件。
```typescript
// data/exchanges/types.ts
// ARCHITECTURE.md 中的设计示意(非实际文件)
import type { Observable } from "rxjs";
/** 统一行情数据源接口 —— 所有交易所适配器必须实现 */
@@ -645,214 +541,7 @@ export abstract class BaseExchangeAdapter implements MarketDataFeed {
---
### 4.6 K 线合成管道 [`pipeline/`](pipeline/)
#### 时间桶算法(核心)
```
每收到一条 Trade
1. 计算桶索引: bucketIndex = floor(timestamp / intervalMs)
2. 若 bucketIndex ≠ 当前桶:
a. emit 当前桶 K 线(已关闭)
b. 创建新桶,O=H=L=C=price, V=amount
3. 若仍在当前桶:
a. H = max(H, price)
b. L = min(L, price)
c. C = price
d. V += amount
```
#### 多周期合成策略(混合方案)
```
Trade 流 ────► 1m 合成器(时间桶,数据源= Trade)
▼ 输出 1m K 线
┌─────────────┐
│ 1m → 5m 聚合 │ ← 5 根 1m → 1 根 5m
└──────┬──────┘
┌──────────────┐
│ 5m → 15m 聚合 │ ← 3 根 5m → 1 根 15m
└──────┬───────┘
┌──────────────┐
│ 15m → 1h 聚合 │ ← 4 根 15m → 1 根 1h
└──────────────┘
```
> **设计理由**:1m 用 Trade 实时合成(精度最高),5m+ 从低周期聚合(计算量最小)。避免为每个周期维护独立的 Trade 缓存造成内存压力。
#### 核心代码骨架
```typescript
// data/pipeline/kline-synthesizer.ts
import { Subject, interval } from "rxjs";
import type { Trade, Kline, KlineInterval } from "../types/market";
const INTERVAL_MS: Record<KlineInterval, number> = {
"1m": 60_000, "5m": 300_000, "15m": 900_000,
"30m": 1_800_000, "1h": 3_600_000, "4h": 14_400_000,
"1d": 86_400_000, "1w": 604_800_000,
};
interface Bucket {
openTime: number;
open: number;
high: number;
low: number;
close: number;
volume: number;
tradeCount: number;
isClosed: boolean;
}
export class KlineSynthesizer {
private buckets = new Map<string, Bucket>();
/**
* 输入一条 Trade,产出可能的闭合 K 线。
* 返回 null 表示当前桶未关闭,暂不产出。
*/
synthesize(
exchange: string,
symbol: string,
interval: KlineInterval,
trade: Trade,
): Kline | null {
const intervalMs = INTERVAL_MS[interval];
const bucketTime = Math.floor(trade.timestamp / intervalMs) * intervalMs;
const key = `${exchange}:${symbol}:${interval}:${bucketTime}`;
const existing = this.buckets.get(key);
if (!existing) {
this.buckets.set(key, {
openTime: bucketTime,
open: trade.price,
high: trade.price,
low: trade.price,
close: trade.price,
volume: trade.amount,
tradeCount: 1,
isClosed: false,
});
return null; // 新桶第一条,不产出
}
existing.high = Math.max(existing.high, trade.price);
existing.low = Math.min(existing.low, trade.price);
existing.close = trade.price;
existing.volume += trade.amount;
existing.tradeCount += 1;
return null;
}
/** 定时关闭过期桶,返回已闭合 K 线列表 */
closeExpired(now: number): Kline[] {
const closed: Kline[] = [];
for (const [key, bucket] of this.buckets.entries()) {
const intervalMs = INTERVAL_MS["1m"]; // 仅 1m 合成器有此方法
if (!bucket.isClosed && now >= bucket.openTime + intervalMs + 15000) {
bucket.isClosed = true;
closed.push(/* 构建 Kline 对象 */);
this.buckets.delete(key);
}
}
return closed;
}
/** 从 1m K 线聚合为高周期 K 线 */
static aggregate(
klines: Kline[],
targetInterval: KlineInterval,
): Kline[] {
const intervalMs = INTERVAL_MS[targetInterval];
const groups = new Map<number, Kline[]>();
for (const k of klines) {
const bucketTime = Math.floor(k.openTime / intervalMs) * intervalMs;
const group = groups.get(bucketTime) ?? [];
group.push(k);
groups.set(bucketTime, group);
}
return Array.from(groups.entries()).map(([bucketTime, group]) => ({
openTime: bucketTime,
closeTime: bucketTime + intervalMs,
open: group[0]!.open,
high: Math.max(...group.map(k => k.high)),
low: Math.min(...group.map(k => k.low)),
close: group[group.length - 1]!.close,
volume: group.reduce((sum, k) => sum + k.volume, 0),
// ...
}));
}
}
```
#### 数据清洗 [`cleaner.ts`](pipeline/cleaner.ts)
| 清洗规则 | 触发条件 | 处理方式 |
|----------|----------|----------|
| 价格异常 | `|price - median| > 10 * MAD` | 丢弃该条 Trade |
| 空成交量 | `volume === 0 && tradeCount === 0` | 用前一根 K 线的 Close 填充(平盘 K 线) |
| 时间戳乱序 | `trade.timestamp < lastTimestamp` | 100ms 排序窗口内重排,超时则丢弃 |
| 断线缺失 | 连续 5 分钟无数据 | 标记 `gap: true`,策略端可选择跳过 |
---
### 4.7 数据发布 [`publisher/`](publisher/)
```typescript
// data/publisher/redis.ts
import Redis from "ioredis";
import { redis as redisConfig } from "../../config";
export class RedisPublisher {
private client: Redis;
private readonly prefix: string;
constructor() {
this.prefix = redisConfig.channelPrefix; // "trade"
this.client = new Redis(redisConfig.url, {
retryStrategy: (times) => {
if (times > redisConfig.maxRetries) return null; // 停止重试
return redisConfig.retryDelayBaseMs * Math.pow(2, times);
},
});
}
/** 发布实时 Ticker */
async publishTicker(exchange: string, symbol: string, data: Ticker): Promise<void> {
if (!redisConfig.publishEnabled) return;
const channel = `${this.prefix}:market:ticker:${exchange}:${symbol}`;
await this.client.publish(channel, JSON.stringify(data));
}
/** 发布 K 线(增量 — 只推送最新一根的 OHLCV 变化) */
async publishKlineDelta(
exchange: string, symbol: string, interval: string, delta: KlineDelta,
): Promise<void> {
if (!redisConfig.publishEnabled) return;
const channel = `${this.prefix}:market:kline:${exchange}:${symbol}:${interval}`;
await this.client.publish(channel, JSON.stringify(delta));
}
}
```
**Redis Pub/Sub 频道命名规范**
| 频道模式 | 说明 | 消费方 |
|----------|------|--------|
| `trade:market:ticker:{exchange}:{symbol}` | 实时 Ticker | Python 策略引擎 |
| `trade:market:kline:{exchange}:{symbol}:{interval}` | K 线增量更新 | Python 策略引擎 |
| `trade:market:trade:{exchange}:{symbol}` | 逐笔成交 | Python 策略引擎 |
| `trade:system:heartbeat:data` | 数据模块心跳 | 监控告警 |
| `trade:system:error:data` | 数据模块异常 | 监控告警 |
---
### 4.8 数据补全服务 [`run/exchange.ts`](run/exchange.ts)
### 4.6 数据补全服务 [`run/exchange.ts`](run/exchange.ts)
独立启动的数据补全服务,从 `trading_pairs` 表中读取每个交易对的 `last_backfill_time`,据此确定需要拉取的历史 K 线范围,补全完成后将 `last_backfill_time` 更新为最新时间点。
@@ -961,72 +650,56 @@ bun run run/exchange.ts --dry-run
## 5. 数据流生命周期
```
交易所 WebSocket
┌────────────┼────────────┐
▼ ▼ ▼
Binance WS OKX WS Bybit WS
└────────────┼────────────
──────▼──────
│ RxJS 流
│ ticker$
│ trade$ │
│ orderbook$ │
└──────┬──────┘
┌────────────┼────────────┐
▼ ▼ ▼
K线合成器 数据清洗 格式转换
(时间桶) (异常过滤) (标准化)
└────────────┼────────────┘
────────────────────────
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ @timescaledb/ │ │ Redis Pub/Sub
│ typeorm 批量写入 实时行情发布
│ │ │ │
│ · Kline (1m) │ │ · ticker channel │
│ · Trade (逐笔) │ │ · kline channel │
│ │ │ · trade channel │
└────────┬──────────┘ └────────┬─────────┘
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ TimescaleDB │ │ Python 策略引擎 │
│ 连续聚合刷新 │ │ (消费 + 决策) │
│ · klines_5m │ │ │
│ · klines_15m │ │ │
│ · klines_1h │ │ │
│ · klines_1d │ │ │
└──────────────────┘ └──────────────────┘
┌──────────────────┐
│ Binance REST │── backfill ────→┌──────────────────┐
│ (官方 SDK) │ │ TimescaleDB │
└──────────────────┘ │ Klines (1m) │
└────────┬─────────┘
┌──────────────────┐
│ Binance WS │── kline:update ──────────
│ (官方 SDK) │ │
└─────────────────┘ ▼ │
│ upsertOrUpdateKlines
│ │
│ ▼ ▼
│ 连续聚合刷新 ←── kline:saved
│ (klines_5m/15m/1h/...)
│ pair:ready ← backfill 完成后触发
│ ws:disconnected / ws:connected / ws:stale
┌────────┴──────────────────────────┐
run/start.ts(编排层)
│ 注册 bus 监听 → backfillKlines() │
│ → WS 订阅 → 入库 → 聚合刷新
└───────────────────────────────────┘
┌───────────────────────────────────┐
│ TypeORM 实体(关系数据)
Exchange / TradingPair
└───────────────────────────────────┘
```
### 启动时序
入口文件:[`run/main.ts`](run/main.ts)
入口文件:[`run/start.ts`](run/start.ts)(编排层,不直接操作 WS 实例、不写库、不刷新聚合)
```
1. 加载配置(config/index.ts → 读取 env.yaml → 零依赖校验
2. 初始化 Pino 日志
3. TypeORM DataSource.initialize()
── 连接 PostgreSQL
├── 自动创建 TimescaleDB 扩展(如不存在)
├── 执行 Migration(生产环境
└── 注册 @timescaledb/typeorm 订阅者
4. 初始化 Redis 连接(ioredis
5. 从数据库加载 TradingPair 列表(active = true
6. 启动交易所适配器(并行)
├── connect() → WebSocket 连接
└── subscribeTicker/Trade(symbols) → RxJS Observable
7. 启动管道
├── KlineSynthesizer 订阅 trade$
├── DataCleaner 订阅合成输出
└── TimescaleDBWriter + RedisPublisher 订阅清洗输出
8. 注册 SIGTERM/SIGINT 优雅关闭
1. 模块 import 阶段(隐式初始化
├── config/index.ts 读取 env.yaml → 零依赖校验
├── utils/logger.ts 初始化 Pino
── db/data-source.ts TypeORM DataSource.initialize()
├── 连接 PostgreSQL
└── 自动创建 TimescaleDB 扩展(如不存在
2. 注册 bus 事件监听(桥接各 service 模块)
├── pair:ready → watchKline(订阅 WS Kline 流
├── kline:update → upsertOrUpdateKlines + updatePairWsTime → emit kline:saved
├── kline:saved → checkAndRefresh(连续聚合刷新) → emit aggregate:refreshed
├── backfill:complete → 统计输出 / 失败退出
└── ws:disconnected / ws:connected / ws:stale → 断线重连检测(5 分钟观察窗口)
3. 调起 backfillKlines()(逐个 pair 回补,自动 emit pair:ready / backfill:complete
4. 所有交易对就绪后,WS 实时行情持续运行
5. 注册 SIGTERM/SIGINT → AppDataSource.destroy() → process.exit
```
---
@@ -1063,46 +736,6 @@ await pool.query(`
`, params);
```
### 6.2 批量写入缓冲
```typescript
// data/pipeline/writer.ts
export class KlineWriter {
private buffer: Kline[] = [];
private flushTimer: ReturnType<typeof setInterval>;
constructor(
private readonly repo: Repository<Kline>,
private readonly batchSize = 500,
private readonly flushIntervalMs = 1000,
) {
// 定时刷新:即使未达阈值,也保证最大延迟 ≤ flushIntervalMs
this.flushTimer = setInterval(() => this.flush(), this.flushIntervalMs);
}
async write(kline: Kline): Promise<void> {
this.buffer.push(kline);
if (this.buffer.length >= this.batchSize) {
await this.flush();
}
}
private async flush(): Promise<void> {
if (this.buffer.length === 0) return;
const batch = this.buffer.splice(0, this.batchSize);
await this.repo.upsert(batch, {
conflictPaths: ["time", "exchange", "symbol", "interval"],
});
logger.debug({ count: batch.length }, "Kline batch flushed");
}
async destroy(): Promise<void> {
clearInterval(this.flushTimer);
await this.flush(); // 最后刷新残留数据
}
}
```
### 6.3 TimescaleDB 特定配置
通过 `@timescaledb/typeorm` 在 DataSource 初始化时自动设置:
@@ -1115,7 +748,7 @@ await AppDataSource.initialize();
// 1. CREATE EXTENSION IF NOT EXISTS timescaledb;
// 2. 为 @Hypertable() 实体调用 create_hypertable()
// 3. 为 @Compress() 实体调用 add_compression_policy()
// 4. @ContinuousAggregate() 实体创建物化视图 + 刷新策略
// 4. 连续聚合视图由 run/build_aggregates_sql.ts 手动管理,不走 @ContinuousAggregate
```
如果 `@timescaledb/typeorm` 的自动迁移不稳定,推荐 fallback 方案:在 `docker-compose.yml` 中挂载初始化脚本到 `db/init-db/`,容器启动时自动执行原始 SQL。
@@ -1235,7 +868,7 @@ export const logger = pino({
### 9.2 优雅关闭(Graceful Shutdown
```typescript
// data/run/main.ts
// data/run/start.ts
async function shutdown(signal: string): Promise<void> {
logger.info({ signal }, "Shutting down");
@@ -1259,35 +892,6 @@ process.on("SIGTERM", () => shutdown("SIGTERM"));
process.on("SIGINT", () => shutdown("SIGINT"));
```
### 9.3 重试工具 [`utils/retry.ts`](utils/retry.ts)
```typescript
// data/utils/retry.ts
export async function withRetry<T>(
fn: () => Promise<T>,
options: {
maxAttempts?: number;
baseDelayMs?: number;
onRetry?: (attempt: number, error: Error) => void;
} = {},
): Promise<T> {
const { maxAttempts = 3, baseDelayMs = 1000, onRetry } = options;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await fn();
} catch (err) {
if (attempt === maxAttempts) throw err;
const delay = baseDelayMs * Math.pow(2, attempt - 1);
onRetry?.(attempt, err as Error);
logger.warn({ attempt, delay, err }, "Retrying operation");
await sleep(delay);
}
}
throw new Error("Unreachable");
}
```
---
## 10. 性能考量
@@ -1299,7 +903,7 @@ export async function withRetry<T>(
| **单笔 Trade 处理延迟** | < 1 μs | 时间桶 HashMap 查找 + 更新 |
| **WebSocket 消息吞吐** | > 10,000 msg/s | 单进程处理全部订阅对 |
| **批量写入延迟** | < 100 ms | 500 条批量 UPSERT |
| **内存占用(10 symbol × 5 周期)** | < 50 MB | 时间桶 + RxJS 缓存 |
| **内存占用(10 symbol × 5 周期)** | < 50 MB | 时间桶缓存 |
| **进程启动时间** | < 3 s | Bun 启动 + TypeORM 初始化 |
### 10.2 优化策略
@@ -1310,7 +914,6 @@ export async function withRetry<T>(
| **TimescaleDB 压缩** | 7 天后自动列式压缩 | 存储减少 90%+ |
| **连续聚合** | 高周期从低周期视图读取 | 查询快 100×(1h K 线无需扫描 1m 表) |
| **连接池复用** | TypeORM DataSource 管理 20 连接 | 防止连接数暴涨 |
| **RxJS 操作符优化** | `share()` 多播 + `bufferTime()` 批量 | 避免重复计算 |
| **JSON 序列化** | 生产环境考虑 MessagePack | 体积减少 30-50%,序列化快 2× |
### 10.3 磁盘空间估算
@@ -1352,10 +955,10 @@ export async function withRetry<T>(
| 数据类型 | 频率 | 写入方式 |
|---------|------|---------|
| 1m K 线 | 每交易对每分钟 1 条 | KlineWriter 500 条/1s 批量缓冲 |
| 1m K 线 | 每交易对每分钟 1 条 | `upsertOrUpdateKlines` 单条写入 |
| 逐笔成交 (Trade) | 高峰 >100 tps/交易对 | 批量写入(待实现) |
100 交易对场景下 K 线写入量 ≈ 100 条/分钟,远低于单次批量阈值(500 条),实际由 1s 定时器触发刷新。不存在写入瓶颈,P2 #12(无批量写入缓冲)在该规模以下可降级评估
100 交易对场景下 K 线写入量 ≈ 100 条/分钟,不存在写入瓶颈
#### 内存
@@ -1363,7 +966,6 @@ export async function withRetry<T>(
|------|---------|-----------|
| K 线时间桶 (1m) | ~200 bytes/桶 | 100 × 200B = 20 KB |
| 5m/15m/1h/4h/1d 聚合桶 | ~200 bytes/桶/周期 | 100 × 5 × 200B = 100 KB |
| RxJS Subject 内部缓存 | 背压控制,可忽略 | < 1 MB |
总内存占用 < 10 MB(不含 Bun 运行时和 TypeORM 连接池),远低于 §10.1 指标目标(< 50 MB)。
@@ -1384,7 +986,7 @@ cd data && bun install
# 编辑项目根目录 env.yaml(如不存在则创建)
# 4. 启动数据模块
bun run run/main.ts
bun run run/start.ts
# 5. 运行测试
bun test
@@ -1397,7 +999,7 @@ bun test
touch exchanges/new-exchange.ts
# 2. 实现 MarketDataFeed 接口
# 3. 在 exchanges/types.ts 注册
# 3. 在 types/base.ts 注册类型,在 exchanges/index.ts 注册适配器
# 4. 编写测试 tests/exchanges/new-exchange.test.ts
# 5. 如需在数据库中配置,插入 exchanges 表和 trading_pairs 表
```
-3
View File
@@ -12,7 +12,6 @@
"pino": "^10.3.1",
"pino-pretty": "^13.1.3",
"pino-roll": "^4.0.0",
"rxjs": "^7.8.2",
"typeorm": "^1.0.0",
"yaml": "^2.9.0",
},
@@ -761,8 +760,6 @@
"rolldown": ["rolldown@1.0.3", "", { "dependencies": { "@oxc-project/types": "=0.133.0", "@rolldown/pluginutils": "^1.0.0" }, "optionalDependencies": { "@rolldown/binding-android-arm64": "1.0.3", "@rolldown/binding-darwin-arm64": "1.0.3", "@rolldown/binding-darwin-x64": "1.0.3", "@rolldown/binding-freebsd-x64": "1.0.3", "@rolldown/binding-linux-arm-gnueabihf": "1.0.3", "@rolldown/binding-linux-arm64-gnu": "1.0.3", "@rolldown/binding-linux-arm64-musl": "1.0.3", "@rolldown/binding-linux-ppc64-gnu": "1.0.3", "@rolldown/binding-linux-s390x-gnu": "1.0.3", "@rolldown/binding-linux-x64-gnu": "1.0.3", "@rolldown/binding-linux-x64-musl": "1.0.3", "@rolldown/binding-openharmony-arm64": "1.0.3", "@rolldown/binding-wasm32-wasi": "1.0.3", "@rolldown/binding-win32-arm64-msvc": "1.0.3", "@rolldown/binding-win32-x64-msvc": "1.0.3" }, "bin": { "rolldown": "./bin/cli.mjs" } }, "sha512-i00lAJ2ks1BYr7rjNjKC7BcqAS7nVfiT3QX1SI5aY+AFHblCmaUf9OE9dbdzDvW6dJxbi2ZCZiy9v3CcwOiX3g=="],
"rxjs": ["rxjs@7.8.2", "", { "dependencies": { "tslib": "^2.1.0" } }, "sha512-dhKf903U/PQZY6boNNtAGdWbG85WAbjT/1xYoZIC7FAY0yWapOBQVsVrDl58W86//e1VpMNBtRV4MaXfdMySFA=="],
"safe-buffer": ["safe-buffer@5.2.1", "", {}, "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ=="],
"safe-stable-stringify": ["safe-stable-stringify@2.5.0", "", {}, "sha512-b3rppTKm9T+PsVCBEOUR46GWI7fdOs00VKZ1+9c1EWDaDMvjQc6tUwuFyIprgGgTcWoVHSKrU8H31ZHA2e0RHA=="],
+1 -10
View File
@@ -7,13 +7,12 @@
//
// 子类只需实现:
// - fetchKlines() — 历史 K 线拉取(基于目标交易所 SDK)
// - fetchMarkets() — 交易对元数据拉取(用于自动注册交易对)
//
// WebSocket 实时行情由各适配器自行管理,不在此基类范围内。
// ============================================================
import { logger } from "../utils/logger";
import type { Kline, MarketInfo, FetchKlinesParams, RestClientConfig } from "../types";
import type { Kline, FetchKlinesParams, RestClientConfig } from "../types";
import { DEFAULT_REST_CONFIG } from "../types/base";
// ============================================================
@@ -86,14 +85,6 @@ export abstract class BaseRestClient {
* @param params - fetchKlines 统一参数对象
*/
abstract fetchKlines(params: FetchKlinesParams): Promise<Kline[]>;
/**
* 获取交易所交易对信息(REST)。
*
* 子类负责调用交易所原生 SDK 的 /exchangeInfo 等价接口,
* 并转换为本系统标准化 MarketInfo 结构。
*/
abstract fetchMarkets(): Promise<MarketInfo[]>;
}
export default BaseRestClient;
+1 -10
View File
@@ -3,7 +3,7 @@ import { MainClient, USDMClient, type Kline as BinanceRestKline } from "binance"
import { logger } from "../../utils/logger";
import { exchange } from "../../config";
import { BaseRestClient } from "../base";
import type { Kline, MarketInfo, KlineInterval, PairType, FetchKlinesParams } from "../../types";
import type { Kline, KlineInterval, PairType, FetchKlinesParams } from "../../types";
// ============================================================
// RestError — REST 请求统一错误类型
@@ -192,11 +192,6 @@ export class BinanceRestClient extends BaseRestClient {
.map((k) => convertBinanceKline(k, symbol, "1m", type))
.sort((a, b) => a.openTime - b.openTime);
}
async fetchMarkets(): Promise<MarketInfo[]> {
// TODO: 调用 Binance /exchangeInfo 接口
return [];
}
}
// ============================================================
@@ -259,8 +254,4 @@ export class BinanceFuturesRestClient extends BaseRestClient {
.map((k) => convertBinanceKline(k, symbol, "1m", type))
.sort((a, b) => a.openTime - b.openTime);
}
async fetchMarkets(): Promise<MarketInfo[]> {
return [];
}
}
+9 -117
View File
@@ -1,121 +1,13 @@
import { BaseRestClient, BaseWsClient } from "./base";
import { BinanceRestClient, BinanceFuturesRestClient } from "./binance/rest";
import { BinanceWsClient } from "./binance/ws";
import type { FetchKlinesParams, Kline, PairType } from "../types";
/** 交易所 ID 到 RestClient 构造器的注册表 */
const registry: Record<string, () => BaseRestClient> = {
binance: () => new BinanceRestClient(),
binance_futures: () => new BinanceFuturesRestClient(),
};
/**
* 创建交易所 REST 客户端。
*
* 根据交易所 ID 返回对应的 RestClient 实例,
* 外部无需感知具体子类。
*
* @param exchangeId - 交易所标识(如 "binance"
* @returns 对应交易所的 RestClient 实例
* @throws 如果 exchangeId 未注册
*/
export function createRestClient(exchangeId: string): BaseRestClient {
const factory = registry[exchangeId];
if (!factory) {
const supported = Object.keys(registry).join(", ");
throw new Error(
`[exchanges] 不支持的交易所: "${exchangeId}",当前支持: ${supported}`,
);
}
return factory();
}
/**
* 拉取历史 K 线(静态便捷方法)。
*
* 根据 exchange + type 自动路由到正确的交易所客户端,
* 调用方无需手动创建 client 或选择 spot/futures 子类。
*
* 路由规则:
* - type = "spot" → exchange(如 "binance"
* - type = "um"/"cm" → `${exchange}_futures`(如 "binance_futures"
*
* @param params - fetchKlines 统一参数对象
* @returns 标准化 K 线数组,按时间升序
*
* @example
* const klines = await fetchKlines({
* exchange: 'binance',
* type: 'um',
* symbol: 'BTCUSDT',
* startTime: 1700000000000,
* limit: 500,
* });
*/
export async function fetchKlines(params: FetchKlinesParams): Promise<Kline[]> {
const exchangeId =
params.type === "spot"
? params.exchange
: `${params.exchange}_futures`;
const client = createRestClient(exchangeId);
return client.fetchKlines(params);
}
// ============================================================
// exchanges/index.ts — 交易所模块统一入口(re-export hub
// ============================================================
// REST client registry → rest-registry.ts
// WS 订阅管理 → ws-manager.ts
// ============================================================
export { createRestClient, fetchKlines } from "./rest-registry";
export { watchKline, watchKlines, unWatchKline } from "./ws-manager";
export { BaseRestClient, BaseWsClient } from "./base";
export { BinanceRestClient, BinanceFuturesRestClient } from "./binance/rest";
export { BinanceRestClient, BinanceFuturesRestClient, convertBinanceKline } from "./binance/rest";
export { BinanceWsClient } from "./binance/ws";
export { KLINE_INTERVAL_MS } from "./constants";
export { convertBinanceKline } from "./binance/rest";
// ============================================================
// WebSocket 订阅管理
// ============================================================
/** WS 实例池:按 `${exchange}:${type}` 索引,每个组合复用同一个 BinanceWsClient */
const wsInstances = new Map<string, BinanceWsClient>();
function getWsInstance(exchange: string, type: PairType): BinanceWsClient {
const key = `${exchange}:${type}`;
let instance = wsInstances.get(key);
if (!instance) {
instance = new BinanceWsClient(exchange, type);
wsInstances.set(key, instance);
}
return instance;
}
/**
* 订阅单个交易对的 WebSocket K 线流。
*
* 根据 exchange + type 自动路由到对应的 BinanceWsClient 实例,
* 同一 exchange:type 组合共享一个 WS 连接,symbol 订阅去重。
*
* @param params - 订阅参数
*
* @example
* watchKline({ exchange: 'binance', type: 'spot', symbol: 'BTCUSDT' });
*/
export function watchKline(params: { exchange: string; type: PairType; symbol: string }): void {
const instance = getWsInstance(params.exchange, params.type);
instance.watch(params.symbol);
}
/**
* 批量订阅多个交易对的 WebSocket K 线流。
* 内部逐个调用 watchKline。
*/
export function watchKlines(pairs: { exchange: string; type: PairType; symbol: string }[]): void {
for (const pair of pairs) {
watchKline(pair);
}
}
/**
* 取消订阅单个交易对的 WebSocket K 线流。
*/
export function unWatchKline(params: { exchange: string; type: PairType; symbol: string }): void {
const key = `${params.exchange}:${params.type}`;
const instance = wsInstances.get(key);
if (!instance) return;
instance.unwatch(params.symbol);
}
+61
View File
@@ -0,0 +1,61 @@
import { BaseRestClient } from "./base";
import { BinanceRestClient, BinanceFuturesRestClient } from "./binance/rest";
import type { FetchKlinesParams, Kline } from "../types";
/** 交易所 ID 到 RestClient 构造器的注册表 */
const registry: Record<string, () => BaseRestClient> = {
binance: () => new BinanceRestClient(),
binance_futures: () => new BinanceFuturesRestClient(),
};
/**
* 创建交易所 REST 客户端。
*
* 根据交易所 ID 返回对应的 RestClient 实例,
* 外部无需感知具体子类。
*
* @param exchangeId - 交易所标识(如 "binance"
* @returns 对应交易所的 RestClient 实例
* @throws 如果 exchangeId 未注册
*/
export function createRestClient(exchangeId: string): BaseRestClient {
const factory = registry[exchangeId];
if (!factory) {
const supported = Object.keys(registry).join(", ");
throw new Error(
`[exchanges] 不支持的交易所: "${exchangeId}",当前支持: ${supported}`,
);
}
return factory();
}
/**
* 拉取历史 K 线(静态便捷方法)。
*
* 根据 exchange + type 自动路由到正确的交易所客户端,
* 调用方无需手动创建 client 或选择 spot/futures 子类。
*
* 路由规则:
* - type = "spot" → exchange(如 "binance"
* - type = "um"/"cm" → `${exchange}_futures`(如 "binance_futures"
*
* @param params - fetchKlines 统一参数对象
* @returns 标准化 K 线数组,按时间升序
*
* @example
* const klines = await fetchKlines({
* exchange: 'binance',
* type: 'um',
* symbol: 'BTCUSDT',
* startTime: 1700000000000,
* limit: 500,
* });
*/
export async function fetchKlines(params: FetchKlinesParams): Promise<Kline[]> {
const exchangeId =
params.type === "spot"
? params.exchange
: `${params.exchange}_futures`;
const client = createRestClient(exchangeId);
return client.fetchKlines(params);
}
+51
View File
@@ -0,0 +1,51 @@
import { BinanceWsClient } from "./binance/ws";
import type { PairType } from "../types";
/** WS 实例池:按 `${exchange}:${type}` 索引,每个组合复用同一个 BinanceWsClient */
const wsInstances = new Map<string, BinanceWsClient>();
function getWsInstance(exchange: string, type: PairType): BinanceWsClient {
const key = `${exchange}:${type}`;
let instance = wsInstances.get(key);
if (!instance) {
instance = new BinanceWsClient(exchange, type);
wsInstances.set(key, instance);
}
return instance;
}
/**
* 订阅单个交易对的 WebSocket K 线流。
*
* 根据 exchange + type 自动路由到对应的 BinanceWsClient 实例,
* 同一 exchange:type 组合共享一个 WS 连接,symbol 订阅去重。
*
* @param params - 订阅参数
*
* @example
* watchKline({ exchange: 'binance', type: 'spot', symbol: 'BTCUSDT' });
*/
export function watchKline(params: { exchange: string; type: PairType; symbol: string }): void {
const instance = getWsInstance(params.exchange, params.type);
instance.watch(params.symbol);
}
/**
* 批量订阅多个交易对的 WebSocket K 线流。
* 内部逐个调用 watchKline。
*/
export function watchKlines(pairs: { exchange: string; type: PairType; symbol: string }[]): void {
for (const pair of pairs) {
watchKline(pair);
}
}
/**
* 取消订阅单个交易对的 WebSocket K 线流。
*/
export function unWatchKline(params: { exchange: string; type: PairType; symbol: string }): void {
const key = `${params.exchange}:${params.type}`;
const instance = wsInstances.get(key);
if (!instance) return;
instance.unwatch(params.symbol);
}
+3 -4
View File
@@ -4,10 +4,10 @@
"description": "数字货币量化交易系统 - TypeScript 数据模块",
"type": "module",
"scripts": {
"dev": "bun run run/main.ts",
"dev:watch": "bun --watch run/main.ts",
"dev": "bun run run/start.ts",
"dev:watch": "bun --watch run/start.ts",
"build": "tsc",
"start": "bun run run/main.ts",
"start": "bun run run/start.ts",
"test": "vitest run",
"test:watch": "vitest",
"lint": "eslint .",
@@ -24,7 +24,6 @@
"pino": "^10.3.1",
"pino-pretty": "^13.1.3",
"pino-roll": "^4.0.0",
"rxjs": "^7.8.2",
"typeorm": "^1.0.0",
"yaml": "^2.9.0"
},
+3 -1
View File
@@ -5,6 +5,8 @@
// 由 run/start.ts 编排层在收到 "kline:saved" 时调用。
// ============================================================
import { logger } from "../utils/logger";
const REFRESH_INTERVALS = [
{ interval: "5m", ms: 300_000 },
{ interval: "15m", ms: 900_000 },
@@ -22,7 +24,7 @@ export async function checkAndRefresh(openTime: number): Promise<void> {
if (openTime % ms === 0) {
// TODO: 实现 DB 连接后接入
// await db.query(`CALL refresh_continuous_aggregate('klines_${interval}', NULL, INTERVAL '1m')`);
console.log(`[aggregate] TODO: refresh klines_${interval}`);
logger.debug({ interval }, "聚合刷新触发");
}
}
}
-26
View File
@@ -1,26 +0,0 @@
import { fetchKlines } from './exchanges';
console.log(await fetchKlines({
exchange: 'binance',
symbol: 'BTCUSDT',
startTime: 0,
limit: 2,
type: 'spot',
}));
// console.log(await coinMClient.getKlines({
// symbol: 'BTCUSD_PERP',
// interval: "1d",
// startTime: 0,
// limit: 2,
// }));
// const data = await coinMClient.getExchangeInfo();
// for (const item of data.symbols) {
// if (item.pair !== 'BTCUSD') {
// continue;
// }
// console.log(item);
// }
+1 -80
View File
@@ -1,5 +1,5 @@
// ============================================================
// types.ts — 统一行情数据类型定义与 MarketDataFeed 接口
// types.ts — 统一行情数据类型定义
// ============================================================
// 所有交易所适配器共享的数据结构和接口契约。
// 适配器负责将交易所原生数据格式转换为以下标准化类型。
@@ -11,8 +11,6 @@
// 对精度敏感场景(如 orderbook 快照)保留原始字符串
// ============================================================
import type { Observable } from "rxjs";
import type { KlineInterval, PairType } from "./kline";
// ============================================================
@@ -146,13 +144,6 @@ export interface KlineDelta {
// WebSocket 连接状态
// ============================================================
/** 连接状态枚举 */
export type ConnectionState =
| "disconnected"
| "connecting"
| "connected"
| "error";
// ============================================================
// REST 客户端配置
// ============================================================
@@ -209,76 +200,6 @@ export const DEFAULT_ADAPTER_CONFIG: AdapterConfig = {
reconnectBaseDelayMs: 3000,
maxReconnectAttempts: 10,
};
// ============================================================
// MarketDataFeed 接口 —— 所有交易所适配器必须实现
// ============================================================
/**
* 统一行情数据源接口。
*
* 每个交易所适配器实现此接口,向上层管道暴露标准化数据流。
* 使用 RxJS Observable 作为统一推送机制,pipeline 层可自由
* 组合、过滤、分流各交易所数据。
*/
export interface MarketDataFeed {
/** 交易所标识(如 "binance" */
readonly exchange: string;
/** 当前连接状态 */
readonly connectionState: ConnectionState;
/** 建立 WebSocket 连接 */
connect(): Promise<void>;
/** 断开连接 */
disconnect(): Promise<void>;
/**
* 订阅 24h 滚动 Ticker 流。
* 每笔成交触发推送(Binance: <symbol>@ticker)。
*/
subscribeTicker(symbols: string[]): Observable<Ticker>;
/**
* 订阅逐笔成交流。
* 实时推送每笔撮合成交(Binance: <symbol>@trade)。
*/
subscribeTrade(symbols: string[]): Observable<Trade>;
/**
* 订阅订单簿深度。
* depth 参数指定档位(如 5/10/20),默认 20。
*/
subscribeOrderbook(symbol: string, depth?: number): Observable<OrderBook>;
/**
* REST 拉取历史 K 线(用于补齐缺失数据或回测)。
*
* @param params - fetchKlines 统一参数对象
* @returns 标准化 K 线数组,按时间升序
*/
fetchKlines(params: FetchKlinesParams): Promise<Kline[]>;
/**
* 获取交易所交易对信息(用于自动注册到 trading_pairs 表)。
* 返回标准化后的交易对元数据。
*/
fetchMarkets(): Promise<MarketInfo[]>;
}
/** 交易对元信息(从交易所 REST API 获取) */
export interface MarketInfo {
symbol: string;
baseAsset: string;
quoteAsset: string;
pricePrecision: number;
quantityPrecision: number;
minQty?: number;
stepSize?: number;
minNotional?: number;
}
// ============================================================
// 工具类型
// ============================================================
+18 -13
View File
@@ -121,40 +121,45 @@
## 🟢 P2 — 计划改进
- [ ] **8. ARCHITECTURE.md 引用不存在的文件/模块**
- [x] **8. ARCHITECTURE.md 引用不存在的文件/模块**
- 位置:`data/ARCHITECTURE.md` 多处
- 问题:`run/main.ts`、`pipeline/`、`publisher/`、`utils/retry.ts`、`exchanges/types.ts`、`db/entities/klines/` 均不存在。架构图显示 RxJS 管道但代码中未使用
- 建议:要么创建缺失模块,要么更新 ARCHITECTURE.md 反映当前简化架构
- 方案:采用 **方案 B** — 更新 ARCHITECTURE.md 反映当前简化架构。移除对不存在文件/模块的所有引用,移除 RxJS 管道相关描述,`run/main.ts` 替换为实际存在的 `run/start.ts``dev`/`start` 脚本同步指向 `run/start.ts`
- [ ] **9. `exchanges/index.ts` 职责过重**
- [x] **9. `exchanges/index.ts` 职责过重**
- 位置:`data/exchanges/index.ts`
- 问题:同时承担 REST client registry7-30 行)和 WS 订阅管理(74-121 行),两类逻辑耦合
- 建议:拆分为 `rest-registry.ts` + `ws-manager.ts`
- 修复:拆分为 `rest-registry.ts`REST client 注册 + fetchKlines+ `ws-manager.ts`WS 实例池 + watchKline/watchKlines/unWatchKline),`index.ts` 降级为纯 re-export hub,对外 API 不变
- [ ] **10. `kline:tick` 微回补未实现**(已知问题 #7)
- 位置:`data/exchanges/binance/ws.ts:62-63` + `data/run/start.ts`
- 问题:WS 订阅后的衔接窗口期(回补完成 → subscribe 之间若跨分钟边界)可能导致最多 1 分钟数据缺口。文档方案已明确,待实现
- 建议:`ws.ts` 中 `final === false` 时 emit `kline:tick``start.ts` 首次 tick 触发微回补
- [ ] **11. `fetchMarkets()` 返回空数组**
- [x] **11. `fetchMarkets()` 返回空数组**
- 位置:`data/exchanges/binance/rest.ts:133,194`
- 问题:TODO 未实现,静默返回 `[]`,调用方无感知
- 建议:实现交易所信息拉取或明确声明暂不支持并抛异常
- 处理:**移除 `fetchMarkets()` 及 `MarketInfo` 类型**。其设计目的为"自动注册交易对",当前所有交易对人工筛选配置,不需要自动发现。涉及 `types/base.ts`MarketInfo 接口)、`exchanges/base.ts`(抽象方法+JSDoc)、`exchanges/binance/rest.ts`(两处实现)
- [ ] **12. 无批量写入缓冲**
- [x] **12. 无批量写入缓冲**
- 位置:`data/run/start.ts:71`
- 问题:每条 `kline:update` 直接 `upsertOrUpdateKlines([kline])`,每个交易对每分钟 1 次 DB 写入。4 个交易对无问题,100+ 交易对可能成为瓶颈
- 建议:参考 ARCHITECTURE.md 的 `KlineWriter` 设计,实现 500 条/1s 批量缓冲
- 问题:每条 `kline:update` 直接 `upsertOrUpdateKlines([kline])`。WS 每秒推送但仅 `final === true` 时 emit(见 `ws.ts:66`
- 计算方法:
- **瞬时 TPS = N**(所有交易对 1m K 线在每分钟第 0 秒附近集中闭合,几乎同时到达)
- **平均 TPS = N / 60**(每分钟 N 条,摊到 60 秒)
- 仅 1m 周期产生写入;5m/15m/1h/4h/1d 走聚合视图,不额外写入
- 当前:**8 交易对 → 瞬时 8 tps**TypeORM 单条 upsert~2-5ms/条)+ `skipUpdateIfNoValuesChanged`,串行总耗时 ~16-40ms,连接池完全过剩。暂不实施
- 触发条件:瞬时 TPS ≥ 100(≈ 单秒写入耗时 > 200ms)或引入逐笔成交实时写入时重新评估
- [ ] **13. `aggregate.ts` 混用 `console.log`**
- [x] **13. `aggregate.ts` 混用 `console.log`**
- 位置:`data/service/aggregate.ts:25`
- 问题:TODO stub 使用 `console.log` 而非 `logger`,日志格式不统一
- 建议:替换为 `logger.debug` `logger.info`
- 修复:`import { logger } from "../utils"` + `console.log` `logger.debug({ interval }, "聚合刷新触发")`
- [ ] **14. `test.ts` 位置不当**
- [x] **14. `test.ts` 位置不当**
- 位置:`data/test.ts`
- 问题:手动 smoke test 放在根目录,与自动化测试混在一起
- 建议:移入 `data/tests/manual/` 或重命名为 `smoke-test.ts`
- 处理:删除。该文件仅为 Binance SDK 临时调试脚本,无保留价值
---
+2 -2
View File
@@ -14,7 +14,7 @@ planner_max_steps = 12 # planner read-only tool-call rounds; 0 = no limit
temperature = 0.0
auto_plan = "off" # off|on; off keeps plan mode manual
reasoning_language = "zh" # visible reasoning language: auto|zh|en
# auto_plan_classifier = "deepseek-pro" # optional; only used for borderline tasks
# auto_plan_classifier = "deepseek-flash" # optional; only used for borderline tasks
soft_compact_ratio = 0.5 # notice only; keeps cache-first prefix intact
compact_ratio = 0.8 # try compacting when prompt reaches this fraction
compact_force_ratio = 0.9 # force compacting at this high-water mark
@@ -58,7 +58,7 @@ enabled = true # language server tools; servers launch lazily when used
# Rules are "Tool" or "Tool(specifier)"; e.g. Bash(go test:*), Edit(src/**).
mode = "ask"
# deny = ["Bash(rm -rf*)", "Bash(git push*)"] # hard-blocked in every mode
allow = ["Bash(cd /Users/rekey/Documents/Code/trade && git status)", "Bash(cd /Users/rekey/Documents/Code/trade && git status --short)", "Bash(cd /Users/rekey/Documents/Code/trade && git diff)", "review"]
allow = ["Bash(cd /Users/rekey/Documents/Code/trade && git status)", "Bash(cd /Users/rekey/Documents/Code/trade && git status --short)", "Bash(cd /Users/rekey/Documents/Code/trade && git diff)", "review", "task", "Edit", "Bash(cd /Users/rekey/Documents/Code/trade/data && bun run test 2>&1)", "Bash(cd /Users/rekey/Documents/Code/trade && git diff --stat)"]
# ask = ["Edit(src/**)"] # force a prompt even if otherwise allowed
[sandbox]