refactor(data/exchanges): 多子类架构 + 工厂入口

- 每个交易所独立子类:BinanceRestClient extends BaseRestClient
- 统一入口 createRestClient(exchangeId) 工厂函数
- KLINE_INTERVAL_MS 移至 constants.ts
- fetchKlines 签名统一为 (symbol, startTime, limit?, endTime?)
- throttle 限流实际生效
- convertBinanceKline interval 参数化
- 删除 filterConsecutive 死代码
- 更新 run/exchange.ts 和 service/bnkline.ts 调用方
This commit is contained in:
Rekey
2026-06-16 17:53:36 +08:00
parent b540b7611c
commit 1adb093100
7 changed files with 197 additions and 254 deletions
@@ -85,14 +85,14 @@ export abstract class BaseRestClient {
*
* @param symbol - BTCUSDT
* @param startTime - Unix ms
* @param endTime - Unix ms
* @param limit - config.defaultLimit
* @param endTime - Unix ms
*/
abstract fetchKlines(
symbol: string,
startTime: number,
endTime: number,
limit?: number,
endTime?: number,
): Promise<Kline[]>;
/**
+141
View File
@@ -0,0 +1,141 @@
import { MainClient, type Kline as BinanceRestKline } from "binance";
import { logger } from "../../utils/logger";
import { exchange } from "../../config";
import { BaseRestClient } from "../base";
import type { Kline, MarketInfo, KlineInterval } from "../../types";
// ============================================================
// Binance REST K 线 → 本系统标准化 Kline 转换
// ============================================================
/**
* Binance SDK Kline 元组格式(getKlines / getUIKlines 返回):
* [0] openTime: number — 开盘时间(Unix ms)
* [1] open: numberInString — 开盘价
* [2] high: numberInString — 最高价
* [3] low: numberInString — 最低价
* [4] close: numberInString — 收盘价
* [5] volume: numberInString — 成交量(base 币种)
* [6] closeTime: number — 收盘时间(Unix ms)
* [7] quoteVolume: numberInString — 成交额(quote 币种)
* [8] tradeCount: number — 成交笔数
* [9] takerBuyBaseVol: numberInString — 主动买入成交量
* [10] takerBuyQuoteVol: numberInString — 主动买入成交额
* [11] ignore: numberInString — 忽略字段
*
* numberInString = string | number,通过 String() 统一转换。
*
* 参考:node_modules/binance/lib/types/shared.d.ts:85-98
*/
function convertBinanceKline(
raw: BinanceRestKline,
symbol: string,
interval: KlineInterval,
): Kline {
const [
openTime,
open,
high,
low,
close,
volume,
closeTime,
quoteVolume,
tradeCount,
takerBuyBaseVol,
takerBuyQuoteVol,
// [11] ignore — 丢弃
] = raw;
return {
exchange: "binance",
symbol,
interval,
openTime,
closeTime,
open: String(open),
high: String(high),
low: String(low),
close: String(close),
volume: String(volume),
quoteVolume: String(quoteVolume),
takerBuyBaseVol: String(takerBuyBaseVol),
takerBuyQuoteVol: String(takerBuyQuoteVol),
tradeCount: String(tradeCount),
isClosed: true, // REST 返回的 K 线均为已闭合历史数据
};
}
// ============================================================
// BinanceRestClient
// ============================================================
export class BinanceRestClient extends BaseRestClient {
readonly exchange = "binance";
private client: MainClient;
constructor() {
super();
this.client = new MainClient(
{
api_key: exchange.binance.apiKey,
api_secret: exchange.binance.apiSecret,
},
{ timeout: 3000 },
);
}
/**
* 拉取 1m K 线并转换为本系统标准化 Kline。
*
* Binance 硬限制单次最多 1000 条,超限自动裁切。
* 高周期 K 线通过 TimescaleDB 连续聚合视图生成。
*
* @param symbol - 交易对(如 BTCUSDT
* @param startTime - 起始时间(Unix ms
* @param limit - 单次拉取条数,默认取自 config.defaultLimit
* @param endTime - 结束时间(Unix ms),可选
*/
async fetchKlines(
symbol: string,
startTime: number,
limit?: number,
endTime?: number,
): Promise<Kline[]> {
const effectiveLimit = limit ?? this.config.defaultLimit;
// Binance 硬限制:单次最多 1000 条
const safeLimit = Math.min(effectiveLimit, 1000);
// 限流
await this.throttle(`${symbol}:1m`);
const rawKlines = await this.client.getKlines({
symbol,
interval: "1m",
startTime,
endTime,
limit: safeLimit,
});
logger.debug(
{ symbol, interval: "1m", startTime, endTime, limit: safeLimit },
"[binance] fetchKlines 请求参数",
);
if (!rawKlines || rawKlines.length === 0) {
return [];
}
// 按 openTime 升序排序(防御性),转换为标准化 Kline
return rawKlines
.map((k) => convertBinanceKline(k, symbol, "1m"))
.sort((a, b) => a.openTime - b.openTime);
}
async fetchMarkets(): Promise<MarketInfo[]> {
// TODO: 调用 Binance /exchangeInfo 接口
return [];
}
}
+18
View File
@@ -0,0 +1,18 @@
import type { KlineInterval } from "../types";
/** K 线周期 → 毫秒数映射(用于时间桶计算) */
export const KLINE_INTERVAL_MS: Record<KlineInterval, number> = {
"1m": 60_000,
"3m": 180_000,
"5m": 300_000,
"15m": 900_000,
"30m": 1_800_000,
"1h": 3_600_000,
"2h": 7_200_000,
"4h": 14_400_000,
"6h": 21_600_000,
"8h": 28_800_000,
"1d": 86_400_000,
"1w": 604_800_000,
"1mon": 2_592_000_000,
};
+32
View File
@@ -0,0 +1,32 @@
import { BaseRestClient } from "./base";
import { BinanceRestClient } from "./binance/rest";
/** 交易所 ID 到 RestClient 构造器的注册表 */
const registry: Record<string, () => BaseRestClient> = {
binance: () => new BinanceRestClient(),
};
/**
* 创建交易所 REST 客户端。
*
* 根据交易所 ID 返回对应的 RestClient 实例,
* 外部无需感知具体子类。
*
* @param exchangeId - 交易所标识(如 "binance"
* @returns 对应交易所的 RestClient 实例
* @throws 如果 exchangeId 未注册
*/
export function createRestClient(exchangeId: string): BaseRestClient {
const factory = registry[exchangeId];
if (!factory) {
const supported = Object.keys(registry).join(", ");
throw new Error(
`[exchanges] 不支持的交易所: "${exchangeId}",当前支持: ${supported}`,
);
}
return factory();
}
export { BaseRestClient } from "./base";
export { BinanceRestClient } from "./binance/rest";
export { KLINE_INTERVAL_MS } from "./constants";
-248
View File
@@ -1,248 +0,0 @@
import { MainClient, type Kline as BinanceRestKline, CoinMClient } from "binance";
import { logger } from "../utils/logger";
import { exchange } from "../config";
import { BaseRestClient } from './base_rest';
import type { KlineInterval, Kline, MarketInfo } from '../types';
/** K 线周期 → 毫秒数映射(用于时间桶计算) */
export const KLINE_INTERVAL_MS: Record<KlineInterval, number> = {
"1m": 60_000,
"3m": 180_000,
"5m": 300_000,
"15m": 900_000,
"30m": 1_800_000,
"1h": 3_600_000,
"2h": 7_200_000,
"4h": 14_400_000,
"6h": 21_600_000,
"8h": 28_800_000,
"1d": 86_400_000,
"1w": 604_800_000,
"1mon": 2_592_000_000,
};
// ============================================================
// Binance REST K 线 → 本系统标准化 Kline 转换
// ============================================================
/**
* Binance SDK Kline 元组格式(getKlines / getUIKlines 返回):
* [0] openTime: number — 开盘时间(Unix ms)
* [1] open: numberInString — 开盘价
* [2] high: numberInString — 最高价
* [3] low: numberInString — 最低价
* [4] close: numberInString — 收盘价
* [5] volume: numberInString — 成交量(base 币种)
* [6] closeTime: number — 收盘时间(Unix ms)
* [7] quoteVolume: numberInString — 成交额(quote 币种)
* [8] tradeCount: number — 成交笔数
* [9] takerBuyBaseVol: numberInString — 主动买入成交量
* [10] takerBuyQuoteVol: numberInString — 主动买入成交额
* [11] ignore: numberInString — 忽略字段
*
* numberInString = string | number,通过 Number() 统一转换。
*
* 参考:node_modules/binance/lib/types/shared.d.ts:85-98
*/
function convertBinanceKline(
raw: BinanceRestKline,
symbol: string,
): Kline {
const [
openTime,
open,
high,
low,
close,
volume,
closeTime,
quoteVolume,
tradeCount,
takerBuyBaseVol,
takerBuyQuoteVol,
// [11] ignore — 丢弃
] = raw;
return {
exchange: "binance",
symbol,
interval: "1m",
openTime: openTime,
closeTime: closeTime,
open: String(open),
high: String(high),
low: String(low),
close: String(close),
volume: String(volume),
quoteVolume: String(quoteVolume),
takerBuyBaseVol: String(takerBuyBaseVol),
takerBuyQuoteVol: String(takerBuyQuoteVol),
tradeCount: String(tradeCount),
isClosed: true, // REST 返回的 K 线均为已闭合历史数据
};
}
// ============================================================
// Binance REST 拉取函数
// ============================================================
/**
* 通过 Binance 原生 SDK 拉取 1m K 线并转换为本系统 Kline。
*
* getUIKlines 与 getKlines 返回同构的 Kline[] 元组,
* getUIKlines 额外支持 timeZone 参数,适合按交易所时区对齐。
*
* @param symbol - 交易对(如 BTCUSDT
* @param startTime - 起始时间(Unix ms
* @param endTime - 结束时间(Unix ms),可选
* @param limit - 单次拉取条数,默认 500(最大 1000)
*/
async function fetchBinanceKlines(
symbol: string,
startTime: number,
endTime?: number,
limit = 500,
): Promise<Kline[]> {
const client = new MainClient({
api_key: exchange.binance.apiKey,
api_secret: exchange.binance.apiSecret,
}, {
timeout: 3000,
});
// Binance 硬限制:单次最多 1000 条
const safeLimit = Math.min(limit, 1000);
const rawKlines = await client.getKlines({
symbol,
interval: "1m",
startTime,
endTime,
limit: safeLimit,
});
logger.debug({
symbol,
interval: "1m",
startTime,
endTime,
limit: safeLimit,
}, 'fetchBinanceKlines arguments');
if (!rawKlines || rawKlines.length === 0) {
return [];
}
return filterConsecutive(
rawKlines.map((k) => convertBinanceKline(k, symbol)),
);
}
/**
* 过滤出严格连续(无时间缺口)的 K 线序列。
*
* 处理流程:
* 1. 按 openTime 升序排序(防御性,确保时间单调递增)
* 2. 从首条 K 线开始遍历,仅保留相邻间隔恰好等于 intervalMs 的条目
* 3. 一旦检测到缺口(间隔 ≠ intervalMs),立即终止并丢弃后续所有数据
*
* 设计意图:
* - 时间序列分析(回测、指标计算)依赖连续数据,缺口会引入偏误
* - 缺口之后的数据可能来自另一段不连续的拉取结果,混入后风险更高
* - "截断"策略优于"填充/跳过",避免伪造数据或隐藏数据质量问题
*
* @param klines - 待过滤的 K 线数组(可能乱序、可能含缺口)
* @param interval - K 线周期,用于查表获取 intervalMs
* @returns 从首条开始严格连续的最大前缀子序列;空数组无缺口时返回完整排序结果
*/
function filterConsecutive(klines: Kline[]) {
// 防御性排序:Binance API 不保证返回顺序,升序排列确保时间单调
const results = klines.sort((a: Kline, b: Kline) => {
return a.openTime - b.openTime;
});
return results;
// console.log(results);
// let _openTime = 0; // 哨兵:0 表示尚未初始化,非 0 表示上一条已收录 K 线的 openTime
// const rets: Kline[] = []; // 累积连续 K 线结果
// for (let item of results) {
// console.log(item.openTime);
// // 分支 1 —— 首条 K 线:无条件收录,并初始化哨兵
// if (_openTime === 0) {
// _openTime = item.openTime;
// rets.push(item);
// continue;
// }
// // 分支 2 —— 严格连续:当前 openTime 与上一条恰好相差一个周期
// if (item.openTime - _openTime === intervalMs) {
// _openTime = item.openTime;
// rets.push(item);
// continue;
// }
// // 分支 3 —— 检测到缺口:截断,丢弃当前及之后所有 K 线
// break;
// }
// return rets;
}
// ============================================================
// Client —— 多交易所 REST 客户端
// ============================================================
export class Client extends BaseRestClient {
exchange: string;
/**
* @param exchange - 交易所 ID(如 "binance"、"okx"、"bybit"
* 内部根据 ID 分发到对应的 SDK 实现
*/
constructor(exchange: string) {
super();
this.exchange = exchange;
}
/**
* 拉取 1m 历史 K 线数据,返回标准化 Kline 数组。
*
* 根据交易所 ID 分发到各自的 SDK 拉取函数。
* K 线周期固定为 1m,高周期通过 TimescaleDB 连续聚合视图生成。
*
* @param symbol - 交易对符号(如 BTCUSDT
* @param startTime - 起始时间(Unix ms
* @param endTime - 结束时间(Unix ms),可选
* @param limit - 最大返回条数,默认取自 config.defaultLimit
*/
async fetchKlines(
symbol: string,
startTime: number,
limit?: number,
endTime?: number,
): Promise<Kline[]> {
const effectiveLimit = limit ?? this.config.defaultLimit;
switch (this.exchange) {
case "binance":
return fetchBinanceKlines(symbol, startTime, endTime, effectiveLimit);
// TODO: 新增交易所在此添加 case
// case "okx":
// return fetchOkxKlines(symbol, interval, startTime, endTime, effectiveLimit);
default:
throw new Error(
`[Client] 不支持的交易所: "${this.exchange}"` +
`当前仅支持: binance`,
);
}
}
async fetchMarkets(): Promise<MarketInfo[]> {
// TODO: 各交易所实现
return [];
}
}
+2 -2
View File
@@ -1,7 +1,7 @@
import { logger } from "../utils/logger";
import { getAllPairs, updatePairLastBackfillTime } from '../service/pair';
import { upsertOrUpdateKlines } from "../service/kline";
import { Client } from '../exchanges/rest';
import { createRestClient } from '../exchanges';
function getNowMinuteMS() {
const minuteMS = 1000 * 60;
@@ -11,7 +11,7 @@ function getNowMinuteMS() {
const allPairs = await getAllPairs();
for (const pair of allPairs) {
const client = new Client("binance");
const client = createRestClient("binance");
let lastBackfillTime = pair.last_backfill_time.getTime();
try {
while (lastBackfillTime < getNowMinuteMS()) {
+2 -2
View File
@@ -1,7 +1,7 @@
import { Client } from "../exchanges/rest";
import { createRestClient } from "../exchanges";
import type { Kline } from "../types";
const client = new Client("binance");
const client = createRestClient("binance");
/**
* 获取 Binance 1m K 线数据(基于 MainClient REST API)。