feat: 接入 USDT-M 合约数据 — type 字段方案

- PairType 定义移至 types/kline.ts (spot/um/cm)
- Kline 接口新增 type 字段,全链路透传
- klines 5列复合主键 (exchange, symbol, type, interval, time)
- 拆出 BinanceFuturesRestClient (USDMClient)
- exchanges/index.ts 注册 binance_futures
- trading_pairs 唯一约束加 type,种子数据加合约对
- 12个连续聚合视图 SELECT/GROUP BY/INDEX 加 type
- 清理 bnkline.ts 废弃代码和 pair.ts 空函数
This commit is contained in:
Rekey
2026-06-16 18:39:40 +08:00
parent 1adb093100
commit 705a2f6ea0
15 changed files with 442 additions and 209 deletions
+6 -2
View File
@@ -24,7 +24,7 @@ import {
UpdateDateColumn,
} from "typeorm";
import type { KlineInterval } from '../../types';
import type { KlineInterval, PairType } from '../../types';
/**
* 1 分钟 K 线 Hypertable
@@ -40,7 +40,7 @@ import type { KlineInterval } from '../../types';
compression: {
compress: true,
compress_orderby: "time DESC",
compress_segmentby: "exchange, symbol",
compress_segmentby: "exchange, symbol, type",
policy: {
schedule_interval: "30 days", // 30 天后自动压缩
},
@@ -65,6 +65,10 @@ export class Kline {
@PrimaryColumn("text")
symbol!: string;
/** 交易对类型(如 spot */
@PrimaryColumn("text", { default: 'spot' })
type!: PairType;
/** K 线周期(1m */
@PrimaryColumn("text")
interval!: KlineInterval;
+6 -1
View File
@@ -21,9 +21,10 @@ import {
} from "typeorm";
import { Exchange } from "./exchange.entity";
import { CommonBaseEntity } from "./common.entity";
import type { KlineInterval, PairType } from '../../types';
@Entity("trading_pairs")
@Index(["exchange", "symbol"], { unique: true }) // 同一交易所下 symbol 唯一
@Index(["exchange", "symbol", "type"], { unique: true }) // 同一交易所下 symbol + type 唯一
@Index(["active"]) // 按激活状态快速筛选
export class TradingPair extends CommonBaseEntity {
/** 所属交易所 */
@@ -35,6 +36,10 @@ export class TradingPair extends CommonBaseEntity {
@Column("varchar", { length: 20 })
symbol!: string;
/** 交易对类型(如 spot */
@Column("text", { default: 'spot' })
type!: PairType;
/** 基础币种(如 BTC */
@Column("varchar", { length: 10 })
base_asset!: string;
+24 -16
View File
@@ -69,6 +69,9 @@ CREATE TABLE IF NOT EXISTS trading_pairs (
-- 交易对符号(如 BTCUSDT / ETHUSDT
symbol VARCHAR(20) NOT NULL,
-- 交易对类型(spot / um / cm
type TEXT NOT NULL DEFAULT 'spot',
-- 基础币种(如 BTC
base_asset VARCHAR(10) NOT NULL,
@@ -104,14 +107,14 @@ CREATE TABLE IF NOT EXISTS trading_pairs (
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
-- 同一交易所下 symbol 唯一
CONSTRAINT uq_trading_pairs_exchange_symbol UNIQUE (exchange_id, symbol)
-- 同一交易所下 symbol + type 唯一
CONSTRAINT uq_trading_pairs_exchange_symbol_type UNIQUE (exchange_id, symbol, type)
);
-- 按激活状态快速筛选
CREATE INDEX IF NOT EXISTS idx_trading_pairs_active ON trading_pairs (active);
-- 按交易所+交易对查询(最常用模式)
CREATE INDEX IF NOT EXISTS idx_trading_pairs_exchange_symbol ON trading_pairs (exchange_id, symbol);
CREATE INDEX IF NOT EXISTS idx_trading_pairs_exchange_symbol ON trading_pairs (exchange_id, symbol, type);
-- ============================================================
@@ -124,7 +127,7 @@ CREATE INDEX IF NOT EXISTS idx_trading_pairs_exchange_symbol ON trading_pairs (e
-- TimescaleDB 配置:
-- - chunk_time_interval: 7 days(周分区;1 day→7 days 减少 7× chunk 数)
-- - 列式压缩:7 天后自动执行(压缩率 ~92%)
-- - 压缩分段键:exchange, symbol(同交易对聚合压缩;interval 固定 1m 无需分段)
-- - 压缩分段键:exchange, symbol, type(同交易对+类型聚合压缩;interval 固定 1m 无需分段)
-- - 压缩排序键:time DESC(查询通常按时间降序)
--
-- chunk 大小选择指南(16GB / i3-7300U / 1TB SSD):
@@ -145,6 +148,9 @@ CREATE TABLE IF NOT EXISTS klines (
-- 交易对符号(如 BTCUSDT
symbol TEXT NOT NULL,
-- 交易对类型(spot / um / cm
type TEXT NOT NULL DEFAULT 'spot',
-- K 线周期(固定 "1m",基表仅存 1 分钟)
interval TEXT NOT NULL,
@@ -202,8 +208,8 @@ CREATE TABLE IF NOT EXISTS klines (
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
-- 复合主键:同一交易所、同一交易对、同一周期、同一时间的 K 线唯一
PRIMARY KEY (exchange, symbol, interval, time)
-- 复合主键:同一交易所、同一交易对、同一类型、同一周期、同一时间的 K 线唯一
PRIMARY KEY (exchange, symbol, type, interval, time)
);
-- ============================================================
@@ -222,10 +228,9 @@ SELECT create_hypertable('klines', 'time',
-- ============================================================
-- 启用列式压缩(先启用压缩,再设置分段/排序键)
-- 注意:interval 在基表固定为 '1m',从 segmentby 中移除以减少压缩分段数
ALTER TABLE klines SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'exchange,symbol',
timescaledb.compress_segmentby = 'exchange,symbol,type',
timescaledb.compress_orderby = 'time DESC'
);
@@ -310,12 +315,13 @@ INSERT INTO exchanges (name, label, enabled, config) VALUES
'{"rateLimit": 600, "minOrderSize": 0.001, "feeTaker": 0.001, "feeMaker": 0.001}'::jsonb)
ON CONFLICT (name) DO NOTHING;
-- 默认交易对(仅 Binance 主流 USDT 永续合约,幂等)
INSERT INTO trading_pairs (exchange_id, symbol, base_asset, quote_asset,
-- 默认交易对(幂等)
INSERT INTO trading_pairs (exchange_id, symbol, type, base_asset, quote_asset,
price_precision, quantity_precision, active)
SELECT
e.id,
sym.symbol,
sym.type,
sym.base,
sym.quote,
2, -- price_precisionUSDT 计价通常 2 位小数)
@@ -324,10 +330,12 @@ SELECT
FROM exchanges e
CROSS JOIN (
VALUES
('BTCUSDT', 'BTC', 'USDT'),
('ETHUSDT', 'ETH', 'USDT'),
('BNBUSDT', 'BNB', 'USDT'),
('SOLUSDT', 'SOL', 'USDT')
) AS sym(symbol, base, quote)
('BTCUSDT', 'spot', 'BTC', 'USDT'),
('ETHUSDT', 'spot', 'ETH', 'USDT'),
('BNBUSDT', 'spot', 'BNB', 'USDT'),
('SOLUSDT', 'spot', 'SOL', 'USDT'),
('BTCUSDT', 'um', 'BTC', 'USDT'),
('ETHUSDT', 'um', 'ETH', 'USDT')
) AS sym(symbol, type, base, quote)
WHERE e.name = 'binance'
ON CONFLICT (exchange_id, symbol) DO NOTHING;
ON CONFLICT (exchange_id, symbol, type) DO NOTHING;
+41 -30
View File
@@ -4,6 +4,8 @@
-- 从 klines(1m)基表创建分层连续聚合物化视图链:
-- 1m → 3m → 5m → 15m → 30m → 1h → 2h → 4h → 6h → 8h → 1d → 1w → 1mon
--
-- GROUP BY 包含 type 列,现货与合约数据隔离聚合。
--
-- 执行前提:
-- 1. klines hypertable 已创建(由 02-init-tables.sql 创建)
-- 2. klines 表中已有数据(至少一条,否则视图创建成功但无数据)
@@ -45,6 +47,7 @@ SELECT
time_bucket('3 minutes', time) AS time,
exchange,
symbol,
type,
'3m'::text AS interval,
FIRST(open, time) AS open,
MAX(high) AS high,
@@ -56,7 +59,7 @@ SELECT
SUM(taker_buy_quote_vol) AS taker_buy_quote_vol,
SUM(trade_count)::integer AS trade_count
FROM klines
GROUP BY time_bucket('3 minutes', klines.time), exchange, symbol
GROUP BY time_bucket('3 minutes', klines.time), exchange, symbol, type
WITH NO DATA;
-- 【模式 A 用户】取消下面注释以启用定时调度刷新
@@ -76,6 +79,7 @@ SELECT
time_bucket('5 minutes', time) AS time,
exchange,
symbol,
type,
'5m'::text AS interval,
FIRST(open, time) AS open,
MAX(high) AS high,
@@ -87,7 +91,7 @@ SELECT
SUM(taker_buy_quote_vol) AS taker_buy_quote_vol,
SUM(trade_count)::integer AS trade_count
FROM klines
GROUP BY time_bucket('5 minutes', klines.time), exchange, symbol
GROUP BY time_bucket('5 minutes', klines.time), exchange, symbol, type
WITH NO DATA;
-- 【模式 A 用户】取消下面注释以启用定时调度刷新
@@ -107,6 +111,7 @@ SELECT
time_bucket('15 minutes', time) AS time,
exchange,
symbol,
type,
'15m'::text AS interval,
FIRST(open, time) AS open,
MAX(high) AS high,
@@ -118,7 +123,7 @@ SELECT
SUM(taker_buy_quote_vol) AS taker_buy_quote_vol,
SUM(trade_count)::integer AS trade_count
FROM klines_5m
GROUP BY time_bucket('15 minutes', klines_5m.time), exchange, symbol
GROUP BY time_bucket('15 minutes', klines_5m.time), exchange, symbol, type
WITH NO DATA;
-- 【模式 A 用户】取消下面注释以启用定时调度刷新
@@ -138,6 +143,7 @@ SELECT
time_bucket('30 minutes', time) AS time,
exchange,
symbol,
type,
'30m'::text AS interval,
FIRST(open, time) AS open,
MAX(high) AS high,
@@ -149,7 +155,7 @@ SELECT
SUM(taker_buy_quote_vol) AS taker_buy_quote_vol,
SUM(trade_count)::integer AS trade_count
FROM klines_15m
GROUP BY time_bucket('30 minutes', klines_15m.time), exchange, symbol
GROUP BY time_bucket('30 minutes', klines_15m.time), exchange, symbol, type
WITH NO DATA;
-- 【模式 A 用户】取消下面注释以启用定时调度刷新
@@ -169,6 +175,7 @@ SELECT
time_bucket('1 hour', time) AS time,
exchange,
symbol,
type,
'1h'::text AS interval,
FIRST(open, time) AS open,
MAX(high) AS high,
@@ -180,7 +187,7 @@ SELECT
SUM(taker_buy_quote_vol) AS taker_buy_quote_vol,
SUM(trade_count)::integer AS trade_count
FROM klines_30m
GROUP BY time_bucket('1 hour', klines_30m.time), exchange, symbol
GROUP BY time_bucket('1 hour', klines_30m.time), exchange, symbol, type
WITH NO DATA;
-- 【模式 A 用户】取消下面注释以启用定时调度刷新
@@ -200,6 +207,7 @@ SELECT
time_bucket('2 hours', time) AS time,
exchange,
symbol,
type,
'2h'::text AS interval,
FIRST(open, time) AS open,
MAX(high) AS high,
@@ -211,7 +219,7 @@ SELECT
SUM(taker_buy_quote_vol) AS taker_buy_quote_vol,
SUM(trade_count)::integer AS trade_count
FROM klines_1h
GROUP BY time_bucket('2 hours', klines_1h.time), exchange, symbol
GROUP BY time_bucket('2 hours', klines_1h.time), exchange, symbol, type
WITH NO DATA;
-- 【模式 A 用户】取消下面注释以启用定时调度刷新
@@ -231,6 +239,7 @@ SELECT
time_bucket('4 hours', time) AS time,
exchange,
symbol,
type,
'4h'::text AS interval,
FIRST(open, time) AS open,
MAX(high) AS high,
@@ -242,7 +251,7 @@ SELECT
SUM(taker_buy_quote_vol) AS taker_buy_quote_vol,
SUM(trade_count)::integer AS trade_count
FROM klines_1h
GROUP BY time_bucket('4 hours', klines_1h.time), exchange, symbol
GROUP BY time_bucket('4 hours', klines_1h.time), exchange, symbol, type
WITH NO DATA;
-- 【模式 A 用户】取消下面注释以启用定时调度刷新
@@ -262,6 +271,7 @@ SELECT
time_bucket('6 hours', time) AS time,
exchange,
symbol,
type,
'6h'::text AS interval,
FIRST(open, time) AS open,
MAX(high) AS high,
@@ -273,7 +283,7 @@ SELECT
SUM(taker_buy_quote_vol) AS taker_buy_quote_vol,
SUM(trade_count)::integer AS trade_count
FROM klines_1h
GROUP BY time_bucket('6 hours', klines_1h.time), exchange, symbol
GROUP BY time_bucket('6 hours', klines_1h.time), exchange, symbol, type
WITH NO DATA;
-- 【模式 A 用户】取消下面注释以启用定时调度刷新
@@ -293,6 +303,7 @@ SELECT
time_bucket('8 hours', time) AS time,
exchange,
symbol,
type,
'8h'::text AS interval,
FIRST(open, time) AS open,
MAX(high) AS high,
@@ -304,7 +315,7 @@ SELECT
SUM(taker_buy_quote_vol) AS taker_buy_quote_vol,
SUM(trade_count)::integer AS trade_count
FROM klines_4h
GROUP BY time_bucket('8 hours', klines_4h.time), exchange, symbol
GROUP BY time_bucket('8 hours', klines_4h.time), exchange, symbol, type
WITH NO DATA;
-- 【模式 A 用户】取消下面注释以启用定时调度刷新
@@ -324,6 +335,7 @@ SELECT
time_bucket('1 day', time) AS time,
exchange,
symbol,
type,
'1d'::text AS interval,
FIRST(open, time) AS open,
MAX(high) AS high,
@@ -335,7 +347,7 @@ SELECT
SUM(taker_buy_quote_vol) AS taker_buy_quote_vol,
SUM(trade_count)::integer AS trade_count
FROM klines_4h
GROUP BY time_bucket('1 day', klines_4h.time), exchange, symbol
GROUP BY time_bucket('1 day', klines_4h.time), exchange, symbol, type
WITH NO DATA;
-- 【模式 A 用户】取消下面注释以启用定时调度刷新
@@ -355,6 +367,7 @@ SELECT
time_bucket('1 week', time) AS time,
exchange,
symbol,
type,
'1w'::text AS interval,
FIRST(open, time) AS open,
MAX(high) AS high,
@@ -366,7 +379,7 @@ SELECT
SUM(taker_buy_quote_vol) AS taker_buy_quote_vol,
SUM(trade_count)::integer AS trade_count
FROM klines_1d
GROUP BY time_bucket('1 week', klines_1d.time), exchange, symbol
GROUP BY time_bucket('1 week', klines_1d.time), exchange, symbol, type
WITH NO DATA;
-- 【模式 A 用户】取消下面注释以启用定时调度刷新
@@ -386,6 +399,7 @@ SELECT
time_bucket('1 month', time) AS time,
exchange,
symbol,
type,
'1mon'::text AS interval,
FIRST(open, time) AS open,
MAX(high) AS high,
@@ -397,7 +411,7 @@ SELECT
SUM(taker_buy_quote_vol) AS taker_buy_quote_vol,
SUM(trade_count)::integer AS trade_count
FROM klines_1d
GROUP BY time_bucket('1 month', klines_1d.time), exchange, symbol
GROUP BY time_bucket('1 month', klines_1d.time), exchange, symbol, type
WITH NO DATA;
-- 【模式 A 用户】取消下面注释以启用定时调度刷新
@@ -409,29 +423,26 @@ WITH NO DATA;
-- );
-- ============================================================
-- 推荐索引:加速按 symbol + time 的查询
-- 推荐索引:加速按 symbol + type + time 的查询
-- ============================================================
CREATE INDEX IF NOT EXISTS idx_klines_3m_symbol_time ON klines_3m (exchange, symbol, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_5m_symbol_time ON klines_5m (exchange, symbol, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_15m_symbol_time ON klines_15m (exchange, symbol, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_30m_symbol_time ON klines_30m (exchange, symbol, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_1h_symbol_time ON klines_1h (exchange, symbol, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_2h_symbol_time ON klines_2h (exchange, symbol, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_4h_symbol_time ON klines_4h (exchange, symbol, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_6h_symbol_time ON klines_6h (exchange, symbol, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_8h_symbol_time ON klines_8h (exchange, symbol, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_1d_symbol_time ON klines_1d (exchange, symbol, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_1w_symbol_time ON klines_1w (exchange, symbol, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_1mon_symbol_time ON klines_1mon (exchange, symbol, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_3m_symbol_time ON klines_3m (exchange, symbol, type, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_5m_symbol_time ON klines_5m (exchange, symbol, type, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_15m_symbol_time ON klines_15m (exchange, symbol, type, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_30m_symbol_time ON klines_30m (exchange, symbol, type, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_1h_symbol_time ON klines_1h (exchange, symbol, type, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_2h_symbol_time ON klines_2h (exchange, symbol, type, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_4h_symbol_time ON klines_4h (exchange, symbol, type, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_6h_symbol_time ON klines_6h (exchange, symbol, type, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_8h_symbol_time ON klines_8h (exchange, symbol, type, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_1d_symbol_time ON klines_1d (exchange, symbol, type, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_1w_symbol_time ON klines_1w (exchange, symbol, type, time DESC);
CREATE INDEX IF NOT EXISTS idx_klines_1mon_symbol_time ON klines_1mon (exchange, symbol, type, time DESC);
-- ============================================================
-- 截面查询索引:加速同一时间点多品种回测查询
-- 查询模式:WHERE exchange='binance' AND time='2024-01-01' AND symbol IN (…)
-- 回测中跨品种截面查询最常见于日线和周线,因此只在这两层建额外索引。
-- 如需其他周期(如 1h、4h)的截面查询,按同样模式扩展。
-- ============================================================
CREATE INDEX IF NOT EXISTS idx_klines_1d_exchange_time_symbol ON klines_1d (exchange, time DESC, symbol);
CREATE INDEX IF NOT EXISTS idx_klines_1w_exchange_time_symbol ON klines_1w (exchange, time DESC, symbol);
CREATE INDEX IF NOT EXISTS idx_klines_1d_exchange_time_symbol ON klines_1d (exchange, time DESC, symbol, type);
CREATE INDEX IF NOT EXISTS idx_klines_1w_exchange_time_symbol ON klines_1w (exchange, time DESC, symbol, type);
-- ============================================================
-- 首次创建后手动刷新所有视图(填充历史数据)
+69 -3
View File
@@ -1,9 +1,9 @@
import { MainClient, type Kline as BinanceRestKline } from "binance";
import { MainClient, USDMClient, type Kline as BinanceRestKline } from "binance";
import { logger } from "../../utils/logger";
import { exchange } from "../../config";
import { BaseRestClient } from "../base";
import type { Kline, MarketInfo, KlineInterval } from "../../types";
import type { Kline, MarketInfo, KlineInterval, PairType } from "../../types";
// ============================================================
// Binance REST K 线 → 本系统标准化 Kline 转换
@@ -32,6 +32,7 @@ function convertBinanceKline(
raw: BinanceRestKline,
symbol: string,
interval: KlineInterval,
type: PairType,
): Kline {
const [
openTime,
@@ -51,6 +52,7 @@ function convertBinanceKline(
return {
exchange: "binance",
symbol,
type,
interval,
openTime,
closeTime,
@@ -130,7 +132,7 @@ export class BinanceRestClient extends BaseRestClient {
// 按 openTime 升序排序(防御性),转换为标准化 Kline
return rawKlines
.map((k) => convertBinanceKline(k, symbol, "1m"))
.map((k) => convertBinanceKline(k, symbol, "1m", "spot"))
.sort((a, b) => a.openTime - b.openTime);
}
@@ -139,3 +141,67 @@ export class BinanceRestClient extends BaseRestClient {
return [];
}
}
// ============================================================
// BinanceFuturesRestClient — USDT-M 永续合约
// ============================================================
export class BinanceFuturesRestClient extends BaseRestClient {
readonly exchange = "binance_futures";
private client: USDMClient;
constructor() {
super();
this.client = new USDMClient(
{
api_key: exchange.binanceFutures.apiKey,
api_secret: exchange.binanceFutures.apiSecret,
},
{ timeout: 3000 },
);
}
/**
* 拉取 USDT-M 永续合约 1m K 线。
*
* USDMClient.getKlines() 返回与 MainClient 同构的 12 元组,
* convertBinanceKline 直接复用,type 固定为 'um'。
*/
async fetchKlines(
symbol: string,
startTime: number,
limit?: number,
endTime?: number,
): Promise<Kline[]> {
const effectiveLimit = limit ?? this.config.defaultLimit;
const safeLimit = Math.min(effectiveLimit, 1000);
await this.throttle(`${symbol}:1m`);
const rawKlines = await this.client.getKlines({
symbol,
interval: "1m",
startTime,
endTime,
limit: safeLimit,
});
logger.debug(
{ symbol, interval: "1m", startTime, endTime, limit: safeLimit },
"[binance_futures] fetchKlines 请求参数",
);
if (!rawKlines || rawKlines.length === 0) {
return [];
}
return rawKlines
.map((k) => convertBinanceKline(k, symbol, "1m", "um"))
.sort((a, b) => a.openTime - b.openTime);
}
async fetchMarkets(): Promise<MarketInfo[]> {
return [];
}
}
+3 -2
View File
@@ -1,9 +1,10 @@
import { BaseRestClient } from "./base";
import { BinanceRestClient } from "./binance/rest";
import { BinanceRestClient, BinanceFuturesRestClient } from "./binance/rest";
/** 交易所 ID 到 RestClient 构造器的注册表 */
const registry: Record<string, () => BaseRestClient> = {
binance: () => new BinanceRestClient(),
binance_futures: () => new BinanceFuturesRestClient(),
};
/**
@@ -28,5 +29,5 @@ export function createRestClient(exchangeId: string): BaseRestClient {
}
export { BaseRestClient } from "./base";
export { BinanceRestClient } from "./binance/rest";
export { BinanceRestClient, BinanceFuturesRestClient } from "./binance/rest";
export { KLINE_INTERVAL_MS } from "./constants";
+3 -2
View File
@@ -11,7 +11,8 @@ function getNowMinuteMS() {
const allPairs = await getAllPairs();
for (const pair of allPairs) {
const client = createRestClient("binance");
const exchangeId = pair.type === 'um' ? 'binance_futures' : 'binance';
const client = createRestClient(exchangeId);
let lastBackfillTime = pair.last_backfill_time.getTime();
try {
while (lastBackfillTime < getNowMinuteMS()) {
@@ -26,7 +27,7 @@ for (const pair of allPairs) {
await upsertOrUpdateKlines(klines);
const lastK = klines[klines.length - 1];
if (lastK) {
await updatePairLastBackfillTime(lastK?.symbol, new Date(lastK.openTime));
await updatePairLastBackfillTime(lastK?.symbol, new Date(lastK.openTime), pair.type);
if (lastBackfillTime === lastK.openTime) {
break;
}
+1 -4
View File
@@ -20,7 +20,4 @@ export async function fetchKlines(
limit = 500,
): Promise<Kline[]> {
return client.fetchKlines(symbol, startTime, limit);
}
console.log(await fetchKlines('BTCUSDT.P', 0, 10));
}
+6 -5
View File
@@ -9,14 +9,14 @@ const repo = AppDataSource.getRepository(Kline);
* 批量 UPSERT K 线数据到 TimescaleDB。
*
* 映射应用层 KlineItem → 数据库实体,通过 INSERT ... ON CONFLICT DO UPDATE
* 实现幂等写入。冲突列为 [exchange, symbol, interval, time]列复合主键),
* 实现幂等写入。冲突列为 [exchange, symbol, type, interval, time]列复合主键),
* 冲突时更新 OHLCV 及扩展字段。
*
* 适用场景:
* - 回补历史 K 线(幂等,重复拉取不产生重复行)
* - WebSocket 实时 K 线增量刷新(更新最新一根未闭合 K 线的 high/low/close/volume
*
* 注意:依赖 Kline 实体的列复合主键 [exchange, symbol, interval, time]。
* 注意:依赖 Kline 实体的列复合主键 [exchange, symbol, type, interval, time]。
* 若实体 PK 结构变更,需同步更新 conflictPaths。
*
* @param KlineItems - 应用层标准化 K 线数组
@@ -35,6 +35,7 @@ export async function upsertOrUpdateKlines(KlineItems: KlineItem[]) {
entity.time = new Date(item.openTime); // Unix ms → Date
entity.exchange = item.exchange;
entity.symbol = item.symbol;
entity.type = item.type;
entity.interval = item.interval;
entity.open = Number(item.open);
entity.high = Number(item.high);
@@ -54,11 +55,11 @@ export async function upsertOrUpdateKlines(KlineItems: KlineItem[]) {
});
try {
// UPSERT: 冲突列匹配复合主键 [exchange, symbol, interval, time]
// 实体已改为列复合 PKON CONFLICT 直接命中主键约束
// UPSERT: 冲突列匹配复合主键 [exchange, symbol, type, interval, time]
// 实体已改为列复合 PKON CONFLICT 直接命中主键约束
// skipUpdateIfNoValuesChanged: 减少不必要的写操作
const result = await repo.upsert(entities, {
conflictPaths: ["exchange", "symbol", "interval", "time"],
conflictPaths: ["exchange", "symbol", "type", "interval", "time"],
skipUpdateIfNoValuesChanged: true,
});
+9 -4
View File
@@ -1,5 +1,6 @@
import { AppDataSource } from "../db/data-source";
import { TradingPair } from "../db/entities/trading-pair.entity";
import type { PairType } from '../types';
const repo = AppDataSource.getRepository(TradingPair);
@@ -18,11 +19,13 @@ export async function getAllPairs() {
* 获取指定交易对的历史 K 线最后补全时间。
*
* @param symbol - 交易对名称(如 "BTCUSDT"
* @param type - 交易对类型(默认 'spot'
* @returns 最后补全时间(UTC),若交易对不存在返回 undefined
*/
export async function getPairLastBackfillTime(symbol: string) {
export async function getPairLastBackfillTime(symbol: string, type: PairType = 'spot') {
const pair = await repo.findOneBy({
symbol
symbol,
type,
});
return pair?.last_backfill_time;
}
@@ -35,11 +38,13 @@ export async function getPairLastBackfillTime(symbol: string) {
*
* @param symbol - 交易对名称(如 "BTCUSDT"
* @param time - 新的最后补全时间(UTC)
* @param type - 交易对类型(默认 'spot'
* @returns 保存后的交易对实体,若交易对不存在返回 undefined
*/
export async function updatePairLastBackfillTime(symbol: string, time: Date) {
export async function updatePairLastBackfillTime(symbol: string, time: Date, type: PairType = 'spot') {
const pair = await repo.findOneBy({
symbol
symbol,
type,
});
if (pair === null) {
return;
+27
View File
@@ -0,0 +1,27 @@
import { MainClient, type Kline as BinanceRestKline, CoinMClient, USDMClient } from "binance";
const usdMClient = new USDMClient();
const coinMClient = new CoinMClient();
console.log(await usdMClient.getKlines({
symbol: 'BTCUSDT',
interval: "1m",
startTime: 0,
limit: 2,
}));
// console.log(await coinMClient.getKlines({
// symbol: 'BTCUSD_PERP',
// interval: "1d",
// startTime: 0,
// limit: 2,
// }));
// const data = await coinMClient.getExchangeInfo();
// for (const item of data.symbols) {
// if (item.pair !== 'BTCUSD') {
// continue;
// }
// console.log(item);
// }
+3 -1
View File
@@ -13,7 +13,7 @@
import type { Observable } from "rxjs";
import type { KlineInterval } from "./kline";
import type { KlineInterval, PairType } from "./kline";
// ============================================================
// 标准化行情数据结构
@@ -97,6 +97,8 @@ export interface Kline {
exchange: string;
/** 交易对符号 */
symbol: string;
/** 交易对类型(spot / um / cm */
type: PairType;
/** K 线周期 */
interval: KlineInterval;
/** 开盘时间(Unix ms */
+2 -1
View File
@@ -1,3 +1,4 @@
export type * from './kline';
export type * from './base';
export * from './base';
export * from './base';
export type { PairType } from './kline';
+3
View File
@@ -1,3 +1,6 @@
/** 交易对类型 */
export type PairType = 'spot' | 'um' | 'cm';
/** K 线周期枚举 */
export type KlineInterval =
| "1m"