From 1adb0931000b8c978746ed505936b70de591a47a Mon Sep 17 00:00:00 2001 From: Rekey Date: Tue, 16 Jun 2026 17:53:36 +0800 Subject: [PATCH] =?UTF-8?q?refactor(data/exchanges):=20=E5=A4=9A=E5=AD=90?= =?UTF-8?q?=E7=B1=BB=E6=9E=B6=E6=9E=84=20+=20=E5=B7=A5=E5=8E=82=E5=85=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 每个交易所独立子类:BinanceRestClient extends BaseRestClient - 统一入口 createRestClient(exchangeId) 工厂函数 - KLINE_INTERVAL_MS 移至 constants.ts - fetchKlines 签名统一为 (symbol, startTime, limit?, endTime?) - throttle 限流实际生效 - convertBinanceKline interval 参数化 - 删除 filterConsecutive 死代码 - 更新 run/exchange.ts 和 service/bnkline.ts 调用方 --- data/exchanges/{base_rest.ts => base.ts} | 4 +- data/exchanges/binance/rest.ts | 141 +++++++++++++ data/exchanges/constants.ts | 18 ++ data/exchanges/index.ts | 32 +++ data/exchanges/rest.ts | 248 ----------------------- data/run/exchange.ts | 4 +- data/service/bnkline.ts | 4 +- 7 files changed, 197 insertions(+), 254 deletions(-) rename data/exchanges/{base_rest.ts => base.ts} (97%) create mode 100644 data/exchanges/binance/rest.ts create mode 100644 data/exchanges/constants.ts create mode 100644 data/exchanges/index.ts delete mode 100644 data/exchanges/rest.ts diff --git a/data/exchanges/base_rest.ts b/data/exchanges/base.ts similarity index 97% rename from data/exchanges/base_rest.ts rename to data/exchanges/base.ts index 1f007d9..38a653f 100644 --- a/data/exchanges/base_rest.ts +++ b/data/exchanges/base.ts @@ -85,14 +85,14 @@ export abstract class BaseRestClient { * * @param symbol - 交易对符号(如 BTCUSDT) * @param startTime - 起始时间(Unix ms) - * @param endTime - 结束时间(Unix ms) * @param limit - 单次最大条数(默认取自 config.defaultLimit) + * @param endTime - 结束时间(Unix ms),可选 */ abstract fetchKlines( symbol: string, startTime: number, - endTime: number, limit?: number, + endTime?: number, ): Promise; /** diff --git a/data/exchanges/binance/rest.ts b/data/exchanges/binance/rest.ts new file mode 100644 index 0000000..04100c9 --- /dev/null +++ b/data/exchanges/binance/rest.ts @@ -0,0 +1,141 @@ +import { MainClient, type Kline as BinanceRestKline } from "binance"; + +import { logger } from "../../utils/logger"; +import { exchange } from "../../config"; +import { BaseRestClient } from "../base"; +import type { Kline, MarketInfo, KlineInterval } from "../../types"; + +// ============================================================ +// Binance REST K 线 → 本系统标准化 Kline 转换 +// ============================================================ + +/** + * Binance SDK Kline 元组格式(getKlines / getUIKlines 返回): + * [0] openTime: number — 开盘时间(Unix ms) + * [1] open: numberInString — 开盘价 + * [2] high: numberInString — 最高价 + * [3] low: numberInString — 最低价 + * [4] close: numberInString — 收盘价 + * [5] volume: numberInString — 成交量(base 币种) + * [6] closeTime: number — 收盘时间(Unix ms) + * [7] quoteVolume: numberInString — 成交额(quote 币种) + * [8] tradeCount: number — 成交笔数 + * [9] takerBuyBaseVol: numberInString — 主动买入成交量 + * [10] takerBuyQuoteVol: numberInString — 主动买入成交额 + * [11] ignore: numberInString — 忽略字段 + * + * numberInString = string | number,通过 String() 统一转换。 + * + * 参考:node_modules/binance/lib/types/shared.d.ts:85-98 + */ +function convertBinanceKline( + raw: BinanceRestKline, + symbol: string, + interval: KlineInterval, +): Kline { + const [ + openTime, + open, + high, + low, + close, + volume, + closeTime, + quoteVolume, + tradeCount, + takerBuyBaseVol, + takerBuyQuoteVol, + // [11] ignore — 丢弃 + ] = raw; + + return { + exchange: "binance", + symbol, + interval, + openTime, + closeTime, + open: String(open), + high: String(high), + low: String(low), + close: String(close), + volume: String(volume), + quoteVolume: String(quoteVolume), + takerBuyBaseVol: String(takerBuyBaseVol), + takerBuyQuoteVol: String(takerBuyQuoteVol), + tradeCount: String(tradeCount), + isClosed: true, // REST 返回的 K 线均为已闭合历史数据 + }; +} + +// ============================================================ +// BinanceRestClient +// ============================================================ + +export class BinanceRestClient extends BaseRestClient { + readonly exchange = "binance"; + + private client: MainClient; + + constructor() { + super(); + this.client = new MainClient( + { + api_key: exchange.binance.apiKey, + api_secret: exchange.binance.apiSecret, + }, + { timeout: 3000 }, + ); + } + + /** + * 拉取 1m K 线并转换为本系统标准化 Kline。 + * + * Binance 硬限制单次最多 1000 条,超限自动裁切。 + * 高周期 K 线通过 TimescaleDB 连续聚合视图生成。 + * + * @param symbol - 交易对(如 BTCUSDT) + * @param startTime - 起始时间(Unix ms) + * @param limit - 单次拉取条数,默认取自 config.defaultLimit + * @param endTime - 结束时间(Unix ms),可选 + */ + async fetchKlines( + symbol: string, + startTime: number, + limit?: number, + endTime?: number, + ): Promise { + const effectiveLimit = limit ?? this.config.defaultLimit; + // Binance 硬限制:单次最多 1000 条 + const safeLimit = Math.min(effectiveLimit, 1000); + + // 限流 + await this.throttle(`${symbol}:1m`); + + const rawKlines = await this.client.getKlines({ + symbol, + interval: "1m", + startTime, + endTime, + limit: safeLimit, + }); + + logger.debug( + { symbol, interval: "1m", startTime, endTime, limit: safeLimit }, + "[binance] fetchKlines 请求参数", + ); + + if (!rawKlines || rawKlines.length === 0) { + return []; + } + + // 按 openTime 升序排序(防御性),转换为标准化 Kline + return rawKlines + .map((k) => convertBinanceKline(k, symbol, "1m")) + .sort((a, b) => a.openTime - b.openTime); + } + + async fetchMarkets(): Promise { + // TODO: 调用 Binance /exchangeInfo 接口 + return []; + } +} diff --git a/data/exchanges/constants.ts b/data/exchanges/constants.ts new file mode 100644 index 0000000..999937b --- /dev/null +++ b/data/exchanges/constants.ts @@ -0,0 +1,18 @@ +import type { KlineInterval } from "../types"; + +/** K 线周期 → 毫秒数映射(用于时间桶计算) */ +export const KLINE_INTERVAL_MS: Record = { + "1m": 60_000, + "3m": 180_000, + "5m": 300_000, + "15m": 900_000, + "30m": 1_800_000, + "1h": 3_600_000, + "2h": 7_200_000, + "4h": 14_400_000, + "6h": 21_600_000, + "8h": 28_800_000, + "1d": 86_400_000, + "1w": 604_800_000, + "1mon": 2_592_000_000, +}; diff --git a/data/exchanges/index.ts b/data/exchanges/index.ts new file mode 100644 index 0000000..98a4c11 --- /dev/null +++ b/data/exchanges/index.ts @@ -0,0 +1,32 @@ +import { BaseRestClient } from "./base"; +import { BinanceRestClient } from "./binance/rest"; + +/** 交易所 ID 到 RestClient 构造器的注册表 */ +const registry: Record BaseRestClient> = { + binance: () => new BinanceRestClient(), +}; + +/** + * 创建交易所 REST 客户端。 + * + * 根据交易所 ID 返回对应的 RestClient 实例, + * 外部无需感知具体子类。 + * + * @param exchangeId - 交易所标识(如 "binance") + * @returns 对应交易所的 RestClient 实例 + * @throws 如果 exchangeId 未注册 + */ +export function createRestClient(exchangeId: string): BaseRestClient { + const factory = registry[exchangeId]; + if (!factory) { + const supported = Object.keys(registry).join(", "); + throw new Error( + `[exchanges] 不支持的交易所: "${exchangeId}",当前支持: ${supported}`, + ); + } + return factory(); +} + +export { BaseRestClient } from "./base"; +export { BinanceRestClient } from "./binance/rest"; +export { KLINE_INTERVAL_MS } from "./constants"; diff --git a/data/exchanges/rest.ts b/data/exchanges/rest.ts deleted file mode 100644 index e709dba..0000000 --- a/data/exchanges/rest.ts +++ /dev/null @@ -1,248 +0,0 @@ -import { MainClient, type Kline as BinanceRestKline, CoinMClient } from "binance"; - -import { logger } from "../utils/logger"; -import { exchange } from "../config"; -import { BaseRestClient } from './base_rest'; -import type { KlineInterval, Kline, MarketInfo } from '../types'; - -/** K 线周期 → 毫秒数映射(用于时间桶计算) */ -export const KLINE_INTERVAL_MS: Record = { - "1m": 60_000, - "3m": 180_000, - "5m": 300_000, - "15m": 900_000, - "30m": 1_800_000, - "1h": 3_600_000, - "2h": 7_200_000, - "4h": 14_400_000, - "6h": 21_600_000, - "8h": 28_800_000, - "1d": 86_400_000, - "1w": 604_800_000, - "1mon": 2_592_000_000, -}; - -// ============================================================ -// Binance REST K 线 → 本系统标准化 Kline 转换 -// ============================================================ - -/** - * Binance SDK Kline 元组格式(getKlines / getUIKlines 返回): - * [0] openTime: number — 开盘时间(Unix ms) - * [1] open: numberInString — 开盘价 - * [2] high: numberInString — 最高价 - * [3] low: numberInString — 最低价 - * [4] close: numberInString — 收盘价 - * [5] volume: numberInString — 成交量(base 币种) - * [6] closeTime: number — 收盘时间(Unix ms) - * [7] quoteVolume: numberInString — 成交额(quote 币种) - * [8] tradeCount: number — 成交笔数 - * [9] takerBuyBaseVol: numberInString — 主动买入成交量 - * [10] takerBuyQuoteVol: numberInString — 主动买入成交额 - * [11] ignore: numberInString — 忽略字段 - * - * numberInString = string | number,通过 Number() 统一转换。 - * - * 参考:node_modules/binance/lib/types/shared.d.ts:85-98 - */ -function convertBinanceKline( - raw: BinanceRestKline, - symbol: string, -): Kline { - const [ - openTime, - open, - high, - low, - close, - volume, - closeTime, - quoteVolume, - tradeCount, - takerBuyBaseVol, - takerBuyQuoteVol, - // [11] ignore — 丢弃 - ] = raw; - - return { - exchange: "binance", - symbol, - interval: "1m", - openTime: openTime, - closeTime: closeTime, - open: String(open), - high: String(high), - low: String(low), - close: String(close), - volume: String(volume), - quoteVolume: String(quoteVolume), - takerBuyBaseVol: String(takerBuyBaseVol), - takerBuyQuoteVol: String(takerBuyQuoteVol), - tradeCount: String(tradeCount), - isClosed: true, // REST 返回的 K 线均为已闭合历史数据 - }; -} - -// ============================================================ -// Binance REST 拉取函数 -// ============================================================ - -/** - * 通过 Binance 原生 SDK 拉取 1m K 线并转换为本系统 Kline。 - * - * getUIKlines 与 getKlines 返回同构的 Kline[] 元组, - * getUIKlines 额外支持 timeZone 参数,适合按交易所时区对齐。 - * - * @param symbol - 交易对(如 BTCUSDT) - * @param startTime - 起始时间(Unix ms) - * @param endTime - 结束时间(Unix ms),可选 - * @param limit - 单次拉取条数,默认 500(最大 1000) - */ -async function fetchBinanceKlines( - symbol: string, - startTime: number, - endTime?: number, - limit = 500, -): Promise { - const client = new MainClient({ - api_key: exchange.binance.apiKey, - api_secret: exchange.binance.apiSecret, - }, { - timeout: 3000, - }); - - // Binance 硬限制:单次最多 1000 条 - const safeLimit = Math.min(limit, 1000); - - const rawKlines = await client.getKlines({ - symbol, - interval: "1m", - startTime, - endTime, - limit: safeLimit, - }); - - logger.debug({ - symbol, - interval: "1m", - startTime, - endTime, - limit: safeLimit, - }, 'fetchBinanceKlines arguments'); - - if (!rawKlines || rawKlines.length === 0) { - return []; - } - - return filterConsecutive( - rawKlines.map((k) => convertBinanceKline(k, symbol)), - ); -} - -/** - * 过滤出严格连续(无时间缺口)的 K 线序列。 - * - * 处理流程: - * 1. 按 openTime 升序排序(防御性,确保时间单调递增) - * 2. 从首条 K 线开始遍历,仅保留相邻间隔恰好等于 intervalMs 的条目 - * 3. 一旦检测到缺口(间隔 ≠ intervalMs),立即终止并丢弃后续所有数据 - * - * 设计意图: - * - 时间序列分析(回测、指标计算)依赖连续数据,缺口会引入偏误 - * - 缺口之后的数据可能来自另一段不连续的拉取结果,混入后风险更高 - * - "截断"策略优于"填充/跳过",避免伪造数据或隐藏数据质量问题 - * - * @param klines - 待过滤的 K 线数组(可能乱序、可能含缺口) - * @param interval - K 线周期,用于查表获取 intervalMs - * @returns 从首条开始严格连续的最大前缀子序列;空数组无缺口时返回完整排序结果 - */ -function filterConsecutive(klines: Kline[]) { - // 防御性排序:Binance API 不保证返回顺序,升序排列确保时间单调 - const results = klines.sort((a: Kline, b: Kline) => { - return a.openTime - b.openTime; - }); - - return results; - - // console.log(results); - - // let _openTime = 0; // 哨兵:0 表示尚未初始化,非 0 表示上一条已收录 K 线的 openTime - // const rets: Kline[] = []; // 累积连续 K 线结果 - - // for (let item of results) { - - // console.log(item.openTime); - // // 分支 1 —— 首条 K 线:无条件收录,并初始化哨兵 - // if (_openTime === 0) { - // _openTime = item.openTime; - // rets.push(item); - // continue; - // } - - // // 分支 2 —— 严格连续:当前 openTime 与上一条恰好相差一个周期 - // if (item.openTime - _openTime === intervalMs) { - // _openTime = item.openTime; - // rets.push(item); - // continue; - // } - - // // 分支 3 —— 检测到缺口:截断,丢弃当前及之后所有 K 线 - // break; - // } - - // return rets; -} - -// ============================================================ -// Client —— 多交易所 REST 客户端 -// ============================================================ - -export class Client extends BaseRestClient { - exchange: string; - - /** - * @param exchange - 交易所 ID(如 "binance"、"okx"、"bybit") - * 内部根据 ID 分发到对应的 SDK 实现 - */ - constructor(exchange: string) { - super(); - this.exchange = exchange; - } - - /** - * 拉取 1m 历史 K 线数据,返回标准化 Kline 数组。 - * - * 根据交易所 ID 分发到各自的 SDK 拉取函数。 - * K 线周期固定为 1m,高周期通过 TimescaleDB 连续聚合视图生成。 - * - * @param symbol - 交易对符号(如 BTCUSDT) - * @param startTime - 起始时间(Unix ms) - * @param endTime - 结束时间(Unix ms),可选 - * @param limit - 最大返回条数,默认取自 config.defaultLimit - */ - async fetchKlines( - symbol: string, - startTime: number, - limit?: number, - endTime?: number, - ): Promise { - const effectiveLimit = limit ?? this.config.defaultLimit; - switch (this.exchange) { - case "binance": - return fetchBinanceKlines(symbol, startTime, endTime, effectiveLimit); - // TODO: 新增交易所在此添加 case - // case "okx": - // return fetchOkxKlines(symbol, interval, startTime, endTime, effectiveLimit); - default: - throw new Error( - `[Client] 不支持的交易所: "${this.exchange}",` + - `当前仅支持: binance`, - ); - } - } - - async fetchMarkets(): Promise { - // TODO: 各交易所实现 - return []; - } -} diff --git a/data/run/exchange.ts b/data/run/exchange.ts index 2af76e2..0a91388 100644 --- a/data/run/exchange.ts +++ b/data/run/exchange.ts @@ -1,7 +1,7 @@ import { logger } from "../utils/logger"; import { getAllPairs, updatePairLastBackfillTime } from '../service/pair'; import { upsertOrUpdateKlines } from "../service/kline"; -import { Client } from '../exchanges/rest'; +import { createRestClient } from '../exchanges'; function getNowMinuteMS() { const minuteMS = 1000 * 60; @@ -11,7 +11,7 @@ function getNowMinuteMS() { const allPairs = await getAllPairs(); for (const pair of allPairs) { - const client = new Client("binance"); + const client = createRestClient("binance"); let lastBackfillTime = pair.last_backfill_time.getTime(); try { while (lastBackfillTime < getNowMinuteMS()) { diff --git a/data/service/bnkline.ts b/data/service/bnkline.ts index a51dcde..aeb8bcd 100644 --- a/data/service/bnkline.ts +++ b/data/service/bnkline.ts @@ -1,7 +1,7 @@ -import { Client } from "../exchanges/rest"; +import { createRestClient } from "../exchanges"; import type { Kline } from "../types"; -const client = new Client("binance"); +const client = createRestClient("binance"); /** * 获取 Binance 1m K 线数据(基于 MainClient REST API)。