diff --git a/PLAN-add-futures-data.md b/PLAN-add-futures-data.md index f4f9dc4..4a823f9 100644 --- a/PLAN-add-futures-data.md +++ b/PLAN-add-futures-data.md @@ -1,198 +1,299 @@ -# 接入 USDT-M 合约数据 — 改造方案 +# 接入 USDT-M 合约数据 — 改造方案(type 字段方案) ## 设计思路 -**用 symbol 后缀区分账户类型**,而非新增 `account_type` 列。核心原则是尽量不动已有表结构和聚合视图。 +**用 `type` 列区分账户类型**,而非 symbol 后缀。`PairType` 枚举已在代码中定义(`'spot' | 'um' | 'cm'`),本次改造将 `type` 从实体层的死代码变成全链路透传的一等字段。 -### Symbol 命名约定 +### 为什么选 type 而非 .P 后缀 -| 账户类型 | symbol 示例 | 说明 | -|---------|-------------|------| -| 现货 | `BTCUSDT` | 不变 | -| USDT-M 永续 | `BTCUSDT.P` | `.P` 后缀标记合约 | -| Coin-M 永续 | `BTCUSDT_PERP` | 预留 | +| | `.P` 后缀 | `type` 字段 | +|---|---|---| +| symbol 语义 | 污染标识符,BTCUSDT.P 不是交易对名称 | 干净,symbol 与 Binance 官方一致 | +| 查询 | `WHERE symbol LIKE '%.P'` 不走索引 | `WHERE type = 'um'` 索引友好 | +| 扩展 | Coin-M 需要 `_PERP` 新后缀规则 | 加 `'cm'` 枚举值即可 | +| API 调用 | 每次 strip 后缀,容易遗漏 | symbol 原样传入,零变换 | +| 与已有代码 | 与 TradingPair.type 字段矛盾 | 与已有 PairType、实体定义完全对齐 | -**核心机制**:对外(入库、查询、展示)统一用带后缀的 symbol;对内(调用 Binance SDK)自动 strip 后缀。 +### Type 枚举 + +| type | 说明 | +|------|------| +| `spot` | 现货(默认值) | +| `um` | USDT-M 永续合约 | +| `cm` | Coin-M 永续合约(预留) | + +### K 线复合主键 + +5 列:`(exchange, symbol, type, interval, time)`。`type` 在主键中保证相同 symbol 的现货与合约 K 线不会 PK 冲突。 --- ## 改动清单 -### 1. 配置层(3 文件) +### 1. 配置层(已完成,无需再改) -**`data/env.yaml`** -```yaml -exchange: - binance: ← 保留不动(向后兼容) - api_key: "..." - api_secret: "..." - binance_futures: ← 新增 - api_key: "..." - api_secret: "..." -``` - -**`data/config/validators.ts`** -- `ExchangeConfig` 接口中 `binance: ExchangeApiKeys` 保持不动 -- 新增 `binance_futures: ExchangeApiKeys` -- `validateConfig()` 中新增解析 `exchange.binance_futures` - -**`data/config/index.ts`** -- 导出 `exchange.binance`(已有)+ 新增 `exchange.binanceFutures` +`data/env.yaml`、`data/config/validators.ts`、`data/config/index.ts` 已有 `binance_futures` 段,无需改动。 --- -### 2. REST 客户端(1 文件,核心改动) +### 2. 类型定义(1 文件) -**`data/exchanges/rest.ts`** +**`data/types/base.ts`** — Kline 接口加 `type` 字段 ```typescript -import { USDMClient } from "binance"; // 新增引入 - -// 工具函数:提取裸 symbol(移除 .P / _PERP 后缀) -function stripSuffix(symbol: string): string { - return symbol.replace(/\.(P|PERP)$/, ""); -} - -// 判断是否为合约 symbol -function isFuturesSymbol(symbol: string): boolean { - return symbol.endsWith(".P") || symbol.endsWith("_PERP"); +export interface Kline { + exchange: string; + symbol: string; + /** 交易对类型(spot / um / cm) */ + type: PairType; // ← 新增 + interval: KlineInterval; + openTime: number; + closeTime: number; + // ... 其余字段不变 } ``` -现有 `fetchBinanceKlines()` 保持不变(处理现货)。新增 `fetchFuturesKlines()`: +--- + +### 3. 实体层(1 文件) + +**`data/db/entities/kline.entity.ts`** — `type` 从 `@PrimaryColumn` 保持不动(已存在,无需改)。 + +`trading-pair.entity.ts` — `type` 列已存在,无需改。 + +--- + +### 4. REST 客户端(2 文件,核心改动) + +**`data/exchanges/binance/rest.ts`** ```typescript -async function fetchFuturesKlines( - symbol: string, // BTCUSDT.P +import { MainClient, USDMClient } from "binance"; + +// convertBinanceKline 加 type 参数 +function convertBinanceKline( + raw: BinanceRestKline, + symbol: string, interval: KlineInterval, - startTime: number, - endTime?: number, - limit = 500, -): Promise { - const rawSymbol = stripSuffix(symbol); // BTCUSDT - const client = new USDMClient({ - api_key: exchange.binanceFutures.apiKey, - api_secret: exchange.binanceFutures.apiSecret, - }, { timeout: 3000 }); - - const rawKlines = await client.getKlines({ - symbol: rawSymbol, + type: PairType, // ← 新增 +): Kline { + return { + exchange: "binance", + symbol, + type, // ← 新增 interval, - startTime, - endTime, - limit: Math.min(limit, 1000), - }); - - return rawKlines.map(k => convertBinanceKline(k, symbol, interval)); + // ... 其余字段不变 + }; } -``` -`Client` 类 `fetchKlines()` 增加分支: +// 现货客户端(已有,加 type = 'spot') +export class BinanceRestClient extends BaseRestClient { + readonly exchange = "binance"; + private client = new MainClient({...}, { timeout: 3000 }); -```typescript -async fetchKlines(...): Promise { - switch (this.exchange) { - case "binance": - if (isFuturesSymbol(symbol)) { - return fetchFuturesKlines(symbol, interval, startTime, endTime, effectiveLimit); - } - return fetchBinanceKlines(symbol, interval, startTime, endTime, effectiveLimit); + async fetchKlines(symbol, startTime, limit, endTime): Promise { + // ... + return rawKlines.map(k => convertBinanceKline(k, symbol, "1m", "spot")); + } +} + +// 合约客户端(新增) +export class BinanceFuturesRestClient extends BaseRestClient { + readonly exchange = "binance"; + private client = new USDMClient( + { api_key: exchange.binanceFutures.apiKey, api_secret: exchange.binanceFutures.apiSecret }, + { timeout: 3000 } + ); + + async fetchKlines(symbol, startTime, limit, endTime): Promise { + // USDMClient.getKlines() 与 MainClient 同构 12 元组,convertBinanceKline 直接复用 + const rawKlines = await this.client.getKlines({ symbol, interval: "1m", ... }); + return rawKlines.map(k => convertBinanceKline(k, symbol, "1m", "um")); } } ``` -**关键**:`convertBinanceKline()` 传入原始 `"BTCUSDT.P"`,转换结果中 `kline.symbol` 自然就是 `"BTCUSDT.P"`,入库后与现货 `"BTCUSDT"` 不会 PK 冲突。 +**`data/exchanges/index.ts`** — 注册合约客户端 + +```typescript +const registry: Record BaseRestClient> = { + binance: () => new BinanceRestClient(), + binance_futures: () => new BinanceFuturesRestClient(), // ← 新增 +}; +``` --- -### 3. 服务层(1 文件) +### 5. 服务层(1 文件) **`data/service/kline.ts`** -- 不需任何改动。`upsertOrUpdateKlines()` 直接把 `Kline.symbol` 写入 `KlineEntity.symbol`,后缀天然带进去。 +```typescript +const entities = KlineItems.map((item) => { + const entity = new Kline(); + entity.type = item.type; // ← 新增 + // ... 其余字段映射不变 +}); ---- - -### 4. 实体层(0 文件) - -**结论:不需要改。** - -- `kline.entity.ts` 的 4 列 PK 保持不变 -- `trading-pair.entity.ts` 不需要加 `account_type`,用 symbol 本身的 `.P` 后缀标识即可 - -TradingPair 的 symbol 设成 `"BTCUSDT.P"`,唯一约束 `(exchange_id, symbol)` 自动不冲突。 - ---- - -### 5. SQL 初始化脚本(1 文件) - -**`data/db/init-db/02-init-tables.sql`** - -种子数据新增合约交易对: - -```sql -INSERT INTO trading_pairs (exchange_id, symbol, base_asset, quote_asset, - price_precision, quantity_precision, kline_interval, kline_intervals, active) -SELECT - e.id, - sym.symbol, - sym.base, - sym.quote, - 2, 5, '1m', '1m,5m,15m,30m,1h,4h,1d,1w', TRUE -FROM exchanges e -CROSS JOIN ( - VALUES - ('BTCUSDT.P', 'BTC', 'USDT'), - ('ETHUSDT.P', 'ETH', 'USDT') -) AS sym(symbol, base, quote) -WHERE e.name = 'binance' -ON CONFLICT (exchange_id, symbol) DO NOTHING; +await repo.upsert(entities, { + conflictPaths: ["exchange", "symbol", "type", "interval", "time"], // ← +type + skipUpdateIfNoValuesChanged: true, +}); ``` -**klines 表结构和连续聚合视图:完全不动。** symbol 值不同,数据天然隔离。 - --- ### 6. 运行脚本(1 文件) -**`data/run/exchange.ts`** +**`data/run/exchange.ts`** — 按 `pair.type` 选择客户端 -- `getAllPairs()` 不受影响,返回的 `symbol` 本身就是 `"BTCUSDT.P"` -- `new Client("binance")` 的 `fetchKlines()` 内部已按 symbol 后缀分派到 `USDMClient` -- 回补循环逻辑不变,`lastBackfillTime` 追踪机制不变 - -**注意**:Binance 的 `USDMClient.getKlines()` 返回与 `MainClient.getKlines()` 同构的 12 元组,`convertBinanceKline()` 可以直接复用。 +```typescript +for (const pair of allPairs) { + const exchangeId = pair.type === 'um' ? 'binance_futures' : 'binance'; + const client = createRestClient(exchangeId); + // ... 其余逻辑不变 +} +``` --- -### 7. 类型定义(0 文件) +### 7. SQL 初始化脚本(2 文件) -`Kline.symbol` 字段类型已是 `string`,直接存 `"BTCUSDT.P"`。无需改动。 +**`data/db/init-db/02-init-tables.sql`** + +klines 表: +```sql +CREATE TABLE IF NOT EXISTS klines ( + exchange TEXT NOT NULL, + symbol TEXT NOT NULL, + type TEXT NOT NULL DEFAULT 'spot', -- 新增 + interval TEXT NOT NULL, + time TIMESTAMPTZ NOT NULL, + -- OHLCV ... + PRIMARY KEY (exchange, symbol, type, interval, time) -- 5 列 +); + +ALTER TABLE klines SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'exchange,symbol,type', -- +type + timescaledb.compress_orderby = 'time DESC' +); +``` + +trading_pairs 表: +```sql +CREATE TABLE IF NOT EXISTS trading_pairs ( + -- ... + symbol VARCHAR(20) NOT NULL, + type TEXT NOT NULL DEFAULT 'spot', -- 新增 + -- ... + CONSTRAINT uq_trading_pairs_exchange_symbol_type UNIQUE (exchange_id, symbol, type) -- +type +); +``` + +种子数据: +```sql +INSERT INTO trading_pairs (exchange_id, symbol, type, base_asset, quote_asset, + price_precision, quantity_precision, active) +SELECT e.id, sym.symbol, sym.type, sym.base, sym.quote, 2, 5, TRUE +FROM exchanges e +CROSS JOIN ( + VALUES + ('BTCUSDT', 'spot', 'BTC', 'USDT'), + ('ETHUSDT', 'spot', 'ETH', 'USDT'), + ('BNBUSDT', 'spot', 'BNB', 'USDT'), + ('SOLUSDT', 'spot', 'SOL', 'USDT'), + ('BTCUSDT', 'um', 'BTC', 'USDT'), + ('ETHUSDT', 'um', 'ETH', 'USDT') +) AS sym(symbol, type, base, quote) +WHERE e.name = 'binance' +ON CONFLICT (exchange_id, symbol, type) DO NOTHING; +``` + +**`data/db/init-db/03-continuous-aggregates.sql`** + +12 个连续聚合视图,每个改 3 处(SELECT / GROUP BY / INDEX 各加 `type`)。以 `klines_3m` 为例: + +```sql +CREATE MATERIALIZED VIEW IF NOT EXISTS klines_3m +WITH (timescaledb.continuous) AS +SELECT + time_bucket('3 minutes', time) AS time, + exchange, + symbol, + type, -- 新增 + '3m'::text AS interval, + FIRST(open, time) AS open, + -- ... +FROM klines +GROUP BY time_bucket('3 minutes', klines.time), exchange, symbol, type -- +type +WITH NO DATA; + +CREATE INDEX IF NOT EXISTS idx_klines_3m_symbol_time + ON klines_3m (exchange, symbol, type, time DESC); -- +type +``` + +--- + +### 8. service/pair.ts(1 文件,小改) + +`getPairLastBackfillTime` / `updatePairLastBackfillTime` 按 `symbol` 查找时需要限定 `type`,避免现货/合约二义性: + +```typescript +// 推荐:type 参数可选,默认 'spot' 向后兼容 +export async function getPairLastBackfillTime(symbol: string, type: PairType = 'spot') { + const pair = await repo.findOneBy({ symbol, type }); + return pair?.last_backfill_time; +} +``` + +实际调用处(`run/exchange.ts` 中)传 `pair.type`。 --- ## 改动汇总 -| 文件 | 改动类型 | 行数估算 | -|------|---------|---------| -| `data/env.yaml` | 新增 `binance_futures` 段 | +4 | -| `data/config/validators.ts` | `ExchangeConfig` + `validateConfig()` 新增解析 | +10 | -| `data/config/index.ts` | 新增 `exchange.binanceFutures` 导出 | +5 | -| `data/exchanges/rest.ts` | 引入 `USDMClient`,新增 `fetchFuturesKlines()`,`Client.fetchKlines()` 增加分支 | +40 | -| `data/db/init-db/02-init-tables.sql` | seed 数据插入合约交易对 | +15 | -| `data/run/exchange.ts` | 无实质改动(后缀自动路由) | 0 | +| 文件 | 改动 | 行数 | +|------|------|------| +| `data/types/base.ts` | Kline 接口 +type | +1 | +| `data/db/entities/kline.entity.ts` | 不动(type 已存在) | 0 | +| `data/exchanges/binance/rest.ts` | 引入 USDMClient,拆现货/合约客户端,convertBinanceKline +type | +50 | +| `data/exchanges/index.ts` | 注册 binance_futures | +2 | +| `data/service/kline.ts` | entity.type 赋值,conflictPaths +1 | +3 | +| `data/run/exchange.ts` | 按 pair.type 选择客户端 | +3 | +| `data/service/pair.ts` | findOneBy 加 type 参数 | +5 | +| `data/db/init-db/02-init-tables.sql` | klines +type列+PK+压缩键,trading_pairs +唯一约束+种子数据 | +10 | +| `data/db/init-db/03-continuous-aggregates.sql` | 12 视图 × 3 处(SELECT/GROUP BY/INDEX) | +36 | -**不动**:klines 表结构、复合主键、连续聚合视图、service/kline.ts、types 目录、实体层。 - -共 **5-6 个文件,~70 行净改动**。 +**共 9 个文件,~110 行净改动。** 不动:配置层、env.yaml。 --- -## 工作顺序 +## 迁移计划(重建数据库) -1. **env.yaml** — 配好 futures 的 API Key -2. **config/validators.ts + config/index.ts** — 解析 + 导出 futures Key -3. **exchanges/rest.ts** — 核心改动,USDMClient + 后缀分派 -4. **02-init-tables.sql** — 种子数据 -5. 验证:跑 `bun run data/run/exchange.ts` 看能否拉下 `BTCUSDT.P` 的 K 线 -6. Coin-M 同理扩展(只加 suffix 规则 + CoinMClient case) +``` +第1步: 修改 SQL DDL + ├── 02-init-tables.sql: klines PK 加 type,trading_pairs 唯一约束加 type + └── 03-continuous-aggregates.sql: 12 视图 SELECT/GROUP BY/INDEX 加 type + +第2步: 修改代码 + ├── types/base.ts: Kline 接口 +type + ├── exchanges/binance/rest.ts: 拆现货/合约客户端 + ├── exchanges/index.ts: 注册 binance_futures + ├── service/kline.ts: entity.type + conflictPaths + ├── service/pair.ts: findOneBy 加 type + └── run/exchange.ts: 按 pair.type 选客户端 + +第3步: 重建数据库 + ├── docker compose down -v && docker compose up -d (清空数据) + ├── 执行 01 → 02 → 03 SQL 初始化脚本 + └── bun run data/run/exchange.ts 全量回补 +``` + +--- + +## 注意事项 + +- `binance_futures` 的 API Key 需要在 Binance 开通合约交易权限,否则 `USDMClient` 调用会报权限错误。 +- 合约 K 线数量通常多于现货(7×24 交易),回补时间更长。 +- Coin-M 接入只需:`PairType` 已有 `'cm'`,新增 `CoinMClient` 类,种子数据加 `('BTCUSDT', 'cm', ...)` 即可。 diff --git a/data/db/entities/kline.entity.ts b/data/db/entities/kline.entity.ts index 745a079..75494f2 100644 --- a/data/db/entities/kline.entity.ts +++ b/data/db/entities/kline.entity.ts @@ -24,7 +24,7 @@ import { UpdateDateColumn, } from "typeorm"; -import type { KlineInterval } from '../../types'; +import type { KlineInterval, PairType } from '../../types'; /** * 1 分钟 K 线 Hypertable @@ -40,7 +40,7 @@ import type { KlineInterval } from '../../types'; compression: { compress: true, compress_orderby: "time DESC", - compress_segmentby: "exchange, symbol", + compress_segmentby: "exchange, symbol, type", policy: { schedule_interval: "30 days", // 30 天后自动压缩 }, @@ -65,6 +65,10 @@ export class Kline { @PrimaryColumn("text") symbol!: string; + /** 交易对类型(如 spot) */ + @PrimaryColumn("text", { default: 'spot' }) + type!: PairType; + /** K 线周期(1m) */ @PrimaryColumn("text") interval!: KlineInterval; diff --git a/data/db/entities/trading-pair.entity.ts b/data/db/entities/trading-pair.entity.ts index 14b3444..0c4199f 100644 --- a/data/db/entities/trading-pair.entity.ts +++ b/data/db/entities/trading-pair.entity.ts @@ -21,9 +21,10 @@ import { } from "typeorm"; import { Exchange } from "./exchange.entity"; import { CommonBaseEntity } from "./common.entity"; +import type { KlineInterval, PairType } from '../../types'; @Entity("trading_pairs") -@Index(["exchange", "symbol"], { unique: true }) // 同一交易所下 symbol 唯一 +@Index(["exchange", "symbol", "type"], { unique: true }) // 同一交易所下 symbol + type 唯一 @Index(["active"]) // 按激活状态快速筛选 export class TradingPair extends CommonBaseEntity { /** 所属交易所 */ @@ -35,6 +36,10 @@ export class TradingPair extends CommonBaseEntity { @Column("varchar", { length: 20 }) symbol!: string; + /** 交易对类型(如 spot) */ + @Column("text", { default: 'spot' }) + type!: PairType; + /** 基础币种(如 BTC) */ @Column("varchar", { length: 10 }) base_asset!: string; diff --git a/data/db/init-db/02-init-tables.sql b/data/db/init-db/02-init-tables.sql index efd3f46..b0aa34c 100644 --- a/data/db/init-db/02-init-tables.sql +++ b/data/db/init-db/02-init-tables.sql @@ -69,6 +69,9 @@ CREATE TABLE IF NOT EXISTS trading_pairs ( -- 交易对符号(如 BTCUSDT / ETHUSDT) symbol VARCHAR(20) NOT NULL, + -- 交易对类型(spot / um / cm) + type TEXT NOT NULL DEFAULT 'spot', + -- 基础币种(如 BTC) base_asset VARCHAR(10) NOT NULL, @@ -104,14 +107,14 @@ CREATE TABLE IF NOT EXISTS trading_pairs ( created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - -- 同一交易所下 symbol 唯一 - CONSTRAINT uq_trading_pairs_exchange_symbol UNIQUE (exchange_id, symbol) + -- 同一交易所下 symbol + type 唯一 + CONSTRAINT uq_trading_pairs_exchange_symbol_type UNIQUE (exchange_id, symbol, type) ); -- 按激活状态快速筛选 CREATE INDEX IF NOT EXISTS idx_trading_pairs_active ON trading_pairs (active); -- 按交易所+交易对查询(最常用模式) -CREATE INDEX IF NOT EXISTS idx_trading_pairs_exchange_symbol ON trading_pairs (exchange_id, symbol); +CREATE INDEX IF NOT EXISTS idx_trading_pairs_exchange_symbol ON trading_pairs (exchange_id, symbol, type); -- ============================================================ @@ -124,7 +127,7 @@ CREATE INDEX IF NOT EXISTS idx_trading_pairs_exchange_symbol ON trading_pairs (e -- TimescaleDB 配置: -- - chunk_time_interval: 7 days(周分区;1 day→7 days 减少 7× chunk 数) -- - 列式压缩:7 天后自动执行(压缩率 ~92%) --- - 压缩分段键:exchange, symbol(同交易对聚合压缩;interval 固定 1m 无需分段) +-- - 压缩分段键:exchange, symbol, type(同交易对+类型聚合压缩;interval 固定 1m 无需分段) -- - 压缩排序键:time DESC(查询通常按时间降序) -- -- chunk 大小选择指南(16GB / i3-7300U / 1TB SSD): @@ -145,6 +148,9 @@ CREATE TABLE IF NOT EXISTS klines ( -- 交易对符号(如 BTCUSDT) symbol TEXT NOT NULL, + -- 交易对类型(spot / um / cm) + type TEXT NOT NULL DEFAULT 'spot', + -- K 线周期(固定 "1m",基表仅存 1 分钟) interval TEXT NOT NULL, @@ -202,8 +208,8 @@ CREATE TABLE IF NOT EXISTS klines ( created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - -- 复合主键:同一交易所、同一交易对、同一周期、同一时间的 K 线唯一 - PRIMARY KEY (exchange, symbol, interval, time) + -- 复合主键:同一交易所、同一交易对、同一类型、同一周期、同一时间的 K 线唯一 + PRIMARY KEY (exchange, symbol, type, interval, time) ); -- ============================================================ @@ -222,10 +228,9 @@ SELECT create_hypertable('klines', 'time', -- ============================================================ -- 启用列式压缩(先启用压缩,再设置分段/排序键) --- 注意:interval 在基表固定为 '1m',从 segmentby 中移除以减少压缩分段数 ALTER TABLE klines SET ( timescaledb.compress, - timescaledb.compress_segmentby = 'exchange,symbol', + timescaledb.compress_segmentby = 'exchange,symbol,type', timescaledb.compress_orderby = 'time DESC' ); @@ -310,12 +315,13 @@ INSERT INTO exchanges (name, label, enabled, config) VALUES '{"rateLimit": 600, "minOrderSize": 0.001, "feeTaker": 0.001, "feeMaker": 0.001}'::jsonb) ON CONFLICT (name) DO NOTHING; --- 默认交易对(仅 Binance 主流 USDT 永续合约,幂等) -INSERT INTO trading_pairs (exchange_id, symbol, base_asset, quote_asset, +-- 默认交易对(幂等) +INSERT INTO trading_pairs (exchange_id, symbol, type, base_asset, quote_asset, price_precision, quantity_precision, active) SELECT e.id, sym.symbol, + sym.type, sym.base, sym.quote, 2, -- price_precision(USDT 计价通常 2 位小数) @@ -324,10 +330,12 @@ SELECT FROM exchanges e CROSS JOIN ( VALUES - ('BTCUSDT', 'BTC', 'USDT'), - ('ETHUSDT', 'ETH', 'USDT'), - ('BNBUSDT', 'BNB', 'USDT'), - ('SOLUSDT', 'SOL', 'USDT') -) AS sym(symbol, base, quote) + ('BTCUSDT', 'spot', 'BTC', 'USDT'), + ('ETHUSDT', 'spot', 'ETH', 'USDT'), + ('BNBUSDT', 'spot', 'BNB', 'USDT'), + ('SOLUSDT', 'spot', 'SOL', 'USDT'), + ('BTCUSDT', 'um', 'BTC', 'USDT'), + ('ETHUSDT', 'um', 'ETH', 'USDT') +) AS sym(symbol, type, base, quote) WHERE e.name = 'binance' -ON CONFLICT (exchange_id, symbol) DO NOTHING; +ON CONFLICT (exchange_id, symbol, type) DO NOTHING; diff --git a/data/db/init-db/03-continuous-aggregates.sql b/data/db/init-db/03-continuous-aggregates.sql index 69306a0..cae593c 100644 --- a/data/db/init-db/03-continuous-aggregates.sql +++ b/data/db/init-db/03-continuous-aggregates.sql @@ -4,6 +4,8 @@ -- 从 klines(1m)基表创建分层连续聚合物化视图链: -- 1m → 3m → 5m → 15m → 30m → 1h → 2h → 4h → 6h → 8h → 1d → 1w → 1mon -- +-- GROUP BY 包含 type 列,现货与合约数据隔离聚合。 +-- -- 执行前提: -- 1. klines hypertable 已创建(由 02-init-tables.sql 创建) -- 2. klines 表中已有数据(至少一条,否则视图创建成功但无数据) @@ -45,6 +47,7 @@ SELECT time_bucket('3 minutes', time) AS time, exchange, symbol, + type, '3m'::text AS interval, FIRST(open, time) AS open, MAX(high) AS high, @@ -56,7 +59,7 @@ SELECT SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, SUM(trade_count)::integer AS trade_count FROM klines -GROUP BY time_bucket('3 minutes', klines.time), exchange, symbol +GROUP BY time_bucket('3 minutes', klines.time), exchange, symbol, type WITH NO DATA; -- 【模式 A 用户】取消下面注释以启用定时调度刷新 @@ -76,6 +79,7 @@ SELECT time_bucket('5 minutes', time) AS time, exchange, symbol, + type, '5m'::text AS interval, FIRST(open, time) AS open, MAX(high) AS high, @@ -87,7 +91,7 @@ SELECT SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, SUM(trade_count)::integer AS trade_count FROM klines -GROUP BY time_bucket('5 minutes', klines.time), exchange, symbol +GROUP BY time_bucket('5 minutes', klines.time), exchange, symbol, type WITH NO DATA; -- 【模式 A 用户】取消下面注释以启用定时调度刷新 @@ -107,6 +111,7 @@ SELECT time_bucket('15 minutes', time) AS time, exchange, symbol, + type, '15m'::text AS interval, FIRST(open, time) AS open, MAX(high) AS high, @@ -118,7 +123,7 @@ SELECT SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, SUM(trade_count)::integer AS trade_count FROM klines_5m -GROUP BY time_bucket('15 minutes', klines_5m.time), exchange, symbol +GROUP BY time_bucket('15 minutes', klines_5m.time), exchange, symbol, type WITH NO DATA; -- 【模式 A 用户】取消下面注释以启用定时调度刷新 @@ -138,6 +143,7 @@ SELECT time_bucket('30 minutes', time) AS time, exchange, symbol, + type, '30m'::text AS interval, FIRST(open, time) AS open, MAX(high) AS high, @@ -149,7 +155,7 @@ SELECT SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, SUM(trade_count)::integer AS trade_count FROM klines_15m -GROUP BY time_bucket('30 minutes', klines_15m.time), exchange, symbol +GROUP BY time_bucket('30 minutes', klines_15m.time), exchange, symbol, type WITH NO DATA; -- 【模式 A 用户】取消下面注释以启用定时调度刷新 @@ -169,6 +175,7 @@ SELECT time_bucket('1 hour', time) AS time, exchange, symbol, + type, '1h'::text AS interval, FIRST(open, time) AS open, MAX(high) AS high, @@ -180,7 +187,7 @@ SELECT SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, SUM(trade_count)::integer AS trade_count FROM klines_30m -GROUP BY time_bucket('1 hour', klines_30m.time), exchange, symbol +GROUP BY time_bucket('1 hour', klines_30m.time), exchange, symbol, type WITH NO DATA; -- 【模式 A 用户】取消下面注释以启用定时调度刷新 @@ -200,6 +207,7 @@ SELECT time_bucket('2 hours', time) AS time, exchange, symbol, + type, '2h'::text AS interval, FIRST(open, time) AS open, MAX(high) AS high, @@ -211,7 +219,7 @@ SELECT SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, SUM(trade_count)::integer AS trade_count FROM klines_1h -GROUP BY time_bucket('2 hours', klines_1h.time), exchange, symbol +GROUP BY time_bucket('2 hours', klines_1h.time), exchange, symbol, type WITH NO DATA; -- 【模式 A 用户】取消下面注释以启用定时调度刷新 @@ -231,6 +239,7 @@ SELECT time_bucket('4 hours', time) AS time, exchange, symbol, + type, '4h'::text AS interval, FIRST(open, time) AS open, MAX(high) AS high, @@ -242,7 +251,7 @@ SELECT SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, SUM(trade_count)::integer AS trade_count FROM klines_1h -GROUP BY time_bucket('4 hours', klines_1h.time), exchange, symbol +GROUP BY time_bucket('4 hours', klines_1h.time), exchange, symbol, type WITH NO DATA; -- 【模式 A 用户】取消下面注释以启用定时调度刷新 @@ -262,6 +271,7 @@ SELECT time_bucket('6 hours', time) AS time, exchange, symbol, + type, '6h'::text AS interval, FIRST(open, time) AS open, MAX(high) AS high, @@ -273,7 +283,7 @@ SELECT SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, SUM(trade_count)::integer AS trade_count FROM klines_1h -GROUP BY time_bucket('6 hours', klines_1h.time), exchange, symbol +GROUP BY time_bucket('6 hours', klines_1h.time), exchange, symbol, type WITH NO DATA; -- 【模式 A 用户】取消下面注释以启用定时调度刷新 @@ -293,6 +303,7 @@ SELECT time_bucket('8 hours', time) AS time, exchange, symbol, + type, '8h'::text AS interval, FIRST(open, time) AS open, MAX(high) AS high, @@ -304,7 +315,7 @@ SELECT SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, SUM(trade_count)::integer AS trade_count FROM klines_4h -GROUP BY time_bucket('8 hours', klines_4h.time), exchange, symbol +GROUP BY time_bucket('8 hours', klines_4h.time), exchange, symbol, type WITH NO DATA; -- 【模式 A 用户】取消下面注释以启用定时调度刷新 @@ -324,6 +335,7 @@ SELECT time_bucket('1 day', time) AS time, exchange, symbol, + type, '1d'::text AS interval, FIRST(open, time) AS open, MAX(high) AS high, @@ -335,7 +347,7 @@ SELECT SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, SUM(trade_count)::integer AS trade_count FROM klines_4h -GROUP BY time_bucket('1 day', klines_4h.time), exchange, symbol +GROUP BY time_bucket('1 day', klines_4h.time), exchange, symbol, type WITH NO DATA; -- 【模式 A 用户】取消下面注释以启用定时调度刷新 @@ -355,6 +367,7 @@ SELECT time_bucket('1 week', time) AS time, exchange, symbol, + type, '1w'::text AS interval, FIRST(open, time) AS open, MAX(high) AS high, @@ -366,7 +379,7 @@ SELECT SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, SUM(trade_count)::integer AS trade_count FROM klines_1d -GROUP BY time_bucket('1 week', klines_1d.time), exchange, symbol +GROUP BY time_bucket('1 week', klines_1d.time), exchange, symbol, type WITH NO DATA; -- 【模式 A 用户】取消下面注释以启用定时调度刷新 @@ -386,6 +399,7 @@ SELECT time_bucket('1 month', time) AS time, exchange, symbol, + type, '1mon'::text AS interval, FIRST(open, time) AS open, MAX(high) AS high, @@ -397,7 +411,7 @@ SELECT SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, SUM(trade_count)::integer AS trade_count FROM klines_1d -GROUP BY time_bucket('1 month', klines_1d.time), exchange, symbol +GROUP BY time_bucket('1 month', klines_1d.time), exchange, symbol, type WITH NO DATA; -- 【模式 A 用户】取消下面注释以启用定时调度刷新 @@ -409,29 +423,26 @@ WITH NO DATA; -- ); -- ============================================================ --- 推荐索引:加速按 symbol + time 的查询 +-- 推荐索引:加速按 symbol + type + time 的查询 -- ============================================================ -CREATE INDEX IF NOT EXISTS idx_klines_3m_symbol_time ON klines_3m (exchange, symbol, time DESC); -CREATE INDEX IF NOT EXISTS idx_klines_5m_symbol_time ON klines_5m (exchange, symbol, time DESC); -CREATE INDEX IF NOT EXISTS idx_klines_15m_symbol_time ON klines_15m (exchange, symbol, time DESC); -CREATE INDEX IF NOT EXISTS idx_klines_30m_symbol_time ON klines_30m (exchange, symbol, time DESC); -CREATE INDEX IF NOT EXISTS idx_klines_1h_symbol_time ON klines_1h (exchange, symbol, time DESC); -CREATE INDEX IF NOT EXISTS idx_klines_2h_symbol_time ON klines_2h (exchange, symbol, time DESC); -CREATE INDEX IF NOT EXISTS idx_klines_4h_symbol_time ON klines_4h (exchange, symbol, time DESC); -CREATE INDEX IF NOT EXISTS idx_klines_6h_symbol_time ON klines_6h (exchange, symbol, time DESC); -CREATE INDEX IF NOT EXISTS idx_klines_8h_symbol_time ON klines_8h (exchange, symbol, time DESC); -CREATE INDEX IF NOT EXISTS idx_klines_1d_symbol_time ON klines_1d (exchange, symbol, time DESC); -CREATE INDEX IF NOT EXISTS idx_klines_1w_symbol_time ON klines_1w (exchange, symbol, time DESC); -CREATE INDEX IF NOT EXISTS idx_klines_1mon_symbol_time ON klines_1mon (exchange, symbol, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_3m_symbol_time ON klines_3m (exchange, symbol, type, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_5m_symbol_time ON klines_5m (exchange, symbol, type, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_15m_symbol_time ON klines_15m (exchange, symbol, type, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_30m_symbol_time ON klines_30m (exchange, symbol, type, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_1h_symbol_time ON klines_1h (exchange, symbol, type, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_2h_symbol_time ON klines_2h (exchange, symbol, type, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_4h_symbol_time ON klines_4h (exchange, symbol, type, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_6h_symbol_time ON klines_6h (exchange, symbol, type, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_8h_symbol_time ON klines_8h (exchange, symbol, type, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_1d_symbol_time ON klines_1d (exchange, symbol, type, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_1w_symbol_time ON klines_1w (exchange, symbol, type, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_1mon_symbol_time ON klines_1mon (exchange, symbol, type, time DESC); -- ============================================================ -- 截面查询索引:加速同一时间点多品种回测查询 --- 查询模式:WHERE exchange='binance' AND time='2024-01-01' AND symbol IN (…) --- 回测中跨品种截面查询最常见于日线和周线,因此只在这两层建额外索引。 --- 如需其他周期(如 1h、4h)的截面查询,按同样模式扩展。 -- ============================================================ -CREATE INDEX IF NOT EXISTS idx_klines_1d_exchange_time_symbol ON klines_1d (exchange, time DESC, symbol); -CREATE INDEX IF NOT EXISTS idx_klines_1w_exchange_time_symbol ON klines_1w (exchange, time DESC, symbol); +CREATE INDEX IF NOT EXISTS idx_klines_1d_exchange_time_symbol ON klines_1d (exchange, time DESC, symbol, type); +CREATE INDEX IF NOT EXISTS idx_klines_1w_exchange_time_symbol ON klines_1w (exchange, time DESC, symbol, type); -- ============================================================ -- 首次创建后手动刷新所有视图(填充历史数据) diff --git a/data/exchanges/binance/rest.ts b/data/exchanges/binance/rest.ts index 04100c9..f486978 100644 --- a/data/exchanges/binance/rest.ts +++ b/data/exchanges/binance/rest.ts @@ -1,9 +1,9 @@ -import { MainClient, type Kline as BinanceRestKline } from "binance"; +import { MainClient, USDMClient, type Kline as BinanceRestKline } from "binance"; import { logger } from "../../utils/logger"; import { exchange } from "../../config"; import { BaseRestClient } from "../base"; -import type { Kline, MarketInfo, KlineInterval } from "../../types"; +import type { Kline, MarketInfo, KlineInterval, PairType } from "../../types"; // ============================================================ // Binance REST K 线 → 本系统标准化 Kline 转换 @@ -32,6 +32,7 @@ function convertBinanceKline( raw: BinanceRestKline, symbol: string, interval: KlineInterval, + type: PairType, ): Kline { const [ openTime, @@ -51,6 +52,7 @@ function convertBinanceKline( return { exchange: "binance", symbol, + type, interval, openTime, closeTime, @@ -130,7 +132,7 @@ export class BinanceRestClient extends BaseRestClient { // 按 openTime 升序排序(防御性),转换为标准化 Kline return rawKlines - .map((k) => convertBinanceKline(k, symbol, "1m")) + .map((k) => convertBinanceKline(k, symbol, "1m", "spot")) .sort((a, b) => a.openTime - b.openTime); } @@ -139,3 +141,67 @@ export class BinanceRestClient extends BaseRestClient { return []; } } + +// ============================================================ +// BinanceFuturesRestClient — USDT-M 永续合约 +// ============================================================ + +export class BinanceFuturesRestClient extends BaseRestClient { + readonly exchange = "binance_futures"; + + private client: USDMClient; + + constructor() { + super(); + this.client = new USDMClient( + { + api_key: exchange.binanceFutures.apiKey, + api_secret: exchange.binanceFutures.apiSecret, + }, + { timeout: 3000 }, + ); + } + + /** + * 拉取 USDT-M 永续合约 1m K 线。 + * + * USDMClient.getKlines() 返回与 MainClient 同构的 12 元组, + * convertBinanceKline 直接复用,type 固定为 'um'。 + */ + async fetchKlines( + symbol: string, + startTime: number, + limit?: number, + endTime?: number, + ): Promise { + const effectiveLimit = limit ?? this.config.defaultLimit; + const safeLimit = Math.min(effectiveLimit, 1000); + + await this.throttle(`${symbol}:1m`); + + const rawKlines = await this.client.getKlines({ + symbol, + interval: "1m", + startTime, + endTime, + limit: safeLimit, + }); + + logger.debug( + { symbol, interval: "1m", startTime, endTime, limit: safeLimit }, + "[binance_futures] fetchKlines 请求参数", + ); + + if (!rawKlines || rawKlines.length === 0) { + return []; + } + + return rawKlines + .map((k) => convertBinanceKline(k, symbol, "1m", "um")) + .sort((a, b) => a.openTime - b.openTime); + } + + async fetchMarkets(): Promise { + return []; + } +} diff --git a/data/exchanges/index.ts b/data/exchanges/index.ts index 98a4c11..f54e177 100644 --- a/data/exchanges/index.ts +++ b/data/exchanges/index.ts @@ -1,9 +1,10 @@ import { BaseRestClient } from "./base"; -import { BinanceRestClient } from "./binance/rest"; +import { BinanceRestClient, BinanceFuturesRestClient } from "./binance/rest"; /** 交易所 ID 到 RestClient 构造器的注册表 */ const registry: Record BaseRestClient> = { binance: () => new BinanceRestClient(), + binance_futures: () => new BinanceFuturesRestClient(), }; /** @@ -28,5 +29,5 @@ export function createRestClient(exchangeId: string): BaseRestClient { } export { BaseRestClient } from "./base"; -export { BinanceRestClient } from "./binance/rest"; +export { BinanceRestClient, BinanceFuturesRestClient } from "./binance/rest"; export { KLINE_INTERVAL_MS } from "./constants"; diff --git a/data/run/exchange.ts b/data/run/exchange.ts index 0a91388..fc0b0ef 100644 --- a/data/run/exchange.ts +++ b/data/run/exchange.ts @@ -11,7 +11,8 @@ function getNowMinuteMS() { const allPairs = await getAllPairs(); for (const pair of allPairs) { - const client = createRestClient("binance"); + const exchangeId = pair.type === 'um' ? 'binance_futures' : 'binance'; + const client = createRestClient(exchangeId); let lastBackfillTime = pair.last_backfill_time.getTime(); try { while (lastBackfillTime < getNowMinuteMS()) { @@ -26,7 +27,7 @@ for (const pair of allPairs) { await upsertOrUpdateKlines(klines); const lastK = klines[klines.length - 1]; if (lastK) { - await updatePairLastBackfillTime(lastK?.symbol, new Date(lastK.openTime)); + await updatePairLastBackfillTime(lastK?.symbol, new Date(lastK.openTime), pair.type); if (lastBackfillTime === lastK.openTime) { break; } diff --git a/data/service/bnkline.ts b/data/service/bnkline.ts index aeb8bcd..5daaafd 100644 --- a/data/service/bnkline.ts +++ b/data/service/bnkline.ts @@ -20,7 +20,4 @@ export async function fetchKlines( limit = 500, ): Promise { return client.fetchKlines(symbol, startTime, limit); -} - - -console.log(await fetchKlines('BTCUSDT.P', 0, 10)); \ No newline at end of file +} \ No newline at end of file diff --git a/data/service/kline.ts b/data/service/kline.ts index 640f906..e39d6a4 100644 --- a/data/service/kline.ts +++ b/data/service/kline.ts @@ -9,14 +9,14 @@ const repo = AppDataSource.getRepository(Kline); * 批量 UPSERT K 线数据到 TimescaleDB。 * * 映射应用层 KlineItem → 数据库实体,通过 INSERT ... ON CONFLICT DO UPDATE - * 实现幂等写入。冲突列为 [exchange, symbol, interval, time](四列复合主键), + * 实现幂等写入。冲突列为 [exchange, symbol, type, interval, time](五列复合主键), * 冲突时更新 OHLCV 及扩展字段。 * * 适用场景: * - 回补历史 K 线(幂等,重复拉取不产生重复行) * - WebSocket 实时 K 线增量刷新(更新最新一根未闭合 K 线的 high/low/close/volume) * - * 注意:依赖 Kline 实体的四列复合主键 [exchange, symbol, interval, time]。 + * 注意:依赖 Kline 实体的五列复合主键 [exchange, symbol, type, interval, time]。 * 若实体 PK 结构变更,需同步更新 conflictPaths。 * * @param KlineItems - 应用层标准化 K 线数组 @@ -35,6 +35,7 @@ export async function upsertOrUpdateKlines(KlineItems: KlineItem[]) { entity.time = new Date(item.openTime); // Unix ms → Date entity.exchange = item.exchange; entity.symbol = item.symbol; + entity.type = item.type; entity.interval = item.interval; entity.open = Number(item.open); entity.high = Number(item.high); @@ -54,11 +55,11 @@ export async function upsertOrUpdateKlines(KlineItems: KlineItem[]) { }); try { - // UPSERT: 冲突列匹配复合主键 [exchange, symbol, interval, time] - // 实体已改为四列复合 PK,ON CONFLICT 直接命中主键约束 + // UPSERT: 冲突列匹配复合主键 [exchange, symbol, type, interval, time] + // 实体已改为五列复合 PK,ON CONFLICT 直接命中主键约束 // skipUpdateIfNoValuesChanged: 减少不必要的写操作 const result = await repo.upsert(entities, { - conflictPaths: ["exchange", "symbol", "interval", "time"], + conflictPaths: ["exchange", "symbol", "type", "interval", "time"], skipUpdateIfNoValuesChanged: true, }); diff --git a/data/service/pair.ts b/data/service/pair.ts index d02fed0..b0c91a9 100644 --- a/data/service/pair.ts +++ b/data/service/pair.ts @@ -1,5 +1,6 @@ import { AppDataSource } from "../db/data-source"; import { TradingPair } from "../db/entities/trading-pair.entity"; +import type { PairType } from '../types'; const repo = AppDataSource.getRepository(TradingPair); @@ -18,11 +19,13 @@ export async function getAllPairs() { * 获取指定交易对的历史 K 线最后补全时间。 * * @param symbol - 交易对名称(如 "BTCUSDT") + * @param type - 交易对类型(默认 'spot') * @returns 最后补全时间(UTC),若交易对不存在返回 undefined */ -export async function getPairLastBackfillTime(symbol: string) { +export async function getPairLastBackfillTime(symbol: string, type: PairType = 'spot') { const pair = await repo.findOneBy({ - symbol + symbol, + type, }); return pair?.last_backfill_time; } @@ -35,11 +38,13 @@ export async function getPairLastBackfillTime(symbol: string) { * * @param symbol - 交易对名称(如 "BTCUSDT") * @param time - 新的最后补全时间(UTC) + * @param type - 交易对类型(默认 'spot') * @returns 保存后的交易对实体,若交易对不存在返回 undefined */ -export async function updatePairLastBackfillTime(symbol: string, time: Date) { +export async function updatePairLastBackfillTime(symbol: string, time: Date, type: PairType = 'spot') { const pair = await repo.findOneBy({ - symbol + symbol, + type, }); if (pair === null) { return; diff --git a/data/test.ts b/data/test.ts new file mode 100644 index 0000000..76a8fce --- /dev/null +++ b/data/test.ts @@ -0,0 +1,27 @@ +import { MainClient, type Kline as BinanceRestKline, CoinMClient, USDMClient } from "binance"; + +const usdMClient = new USDMClient(); +const coinMClient = new CoinMClient(); + +console.log(await usdMClient.getKlines({ + symbol: 'BTCUSDT', + interval: "1m", + startTime: 0, + limit: 2, +})); + +// console.log(await coinMClient.getKlines({ +// symbol: 'BTCUSD_PERP', +// interval: "1d", +// startTime: 0, +// limit: 2, +// })); + +// const data = await coinMClient.getExchangeInfo(); + +// for (const item of data.symbols) { +// if (item.pair !== 'BTCUSD') { +// continue; +// } +// console.log(item); +// } \ No newline at end of file diff --git a/data/types/base.ts b/data/types/base.ts index bc0fbca..b3d7430 100644 --- a/data/types/base.ts +++ b/data/types/base.ts @@ -13,7 +13,7 @@ import type { Observable } from "rxjs"; -import type { KlineInterval } from "./kline"; +import type { KlineInterval, PairType } from "./kline"; // ============================================================ // 标准化行情数据结构 @@ -97,6 +97,8 @@ export interface Kline { exchange: string; /** 交易对符号 */ symbol: string; + /** 交易对类型(spot / um / cm) */ + type: PairType; /** K 线周期 */ interval: KlineInterval; /** 开盘时间(Unix ms) */ diff --git a/data/types/index.ts b/data/types/index.ts index 729f04c..bc75cad 100644 --- a/data/types/index.ts +++ b/data/types/index.ts @@ -1,3 +1,4 @@ export type * from './kline'; export type * from './base'; -export * from './base'; \ No newline at end of file +export * from './base'; +export type { PairType } from './kline'; \ No newline at end of file diff --git a/data/types/kline.ts b/data/types/kline.ts index 0ddb388..281e158 100644 --- a/data/types/kline.ts +++ b/data/types/kline.ts @@ -1,3 +1,6 @@ +/** 交易对类型 */ +export type PairType = 'spot' | 'um' | 'cm'; + /** K 线周期枚举 */ export type KlineInterval = | "1m"