refactor(data): 重构交易所适配器,修复 Kline 实体复合主键

- 废弃旧 adapter 体系 (base/binance/types.ts),新增 base_rest/rest.ts
  基于 Binance 官方 SDK 实现 REST K 线拉取
- Kline 实体改为四列复合主键 (exchange/symbol/interval/time),
  修复单列 time PK 导致的跨 symbol 写入冲突
- 新增 filterConsecutive():K 线连续性过滤,首缺口截断策略
- 新增 service/kline.ts:批量 UPSERT K 线入库
- 新增 types/ 共享类型定义、example/ 示例、run/ 运行脚本
This commit is contained in:
Rekey
2026-06-08 18:18:16 +08:00
parent 85a0031a78
commit 5e385547c7
16 changed files with 829 additions and 1043 deletions
-186
View File
@@ -1,186 +0,0 @@
// ============================================================
// base.ts — 交易所适配器抽象基类
// ============================================================
// 所有交易所适配器(Binance / OKX / Bybit ...)继承此类,
// 复用指数退避重连、连接状态管理、限流等通用逻辑。
//
// 子类只需实现:
// - connect() — 建立 WebSocket/REST 连接
// - disconnect() — 断开连接并清理资源
// - subscribeTicker() / subscribeTrade() / subscribeOrderbook()
// - fetchKlines() — REST 历史 K 线补拉
// - fetchMarkets() — 交易对元数据拉取
// ============================================================
import { Subject, type Observable } from "rxjs";
import { logger } from "../utils/logger";
import type {
MarketDataFeed,
Ticker,
Trade,
OrderBook,
Kline,
KlineInterval,
MarketInfo,
ConnectionState,
AdapterConfig,
} from "./types";
import { DEFAULT_ADAPTER_CONFIG } from "./types";
// ============================================================
// 工具:异步 sleep
// ============================================================
/** 返回一个在 ms 毫秒后 resolve 的 Promise */
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
// ============================================================
// BaseExchangeAdapter
// ============================================================
export abstract class BaseExchangeAdapter implements MarketDataFeed {
/** 交易所标识(子类必须覆盖) */
abstract readonly exchange: string;
/** 适配器配置(可在子类构造函数中覆盖默认值) */
protected readonly config: AdapterConfig;
/** 当前连接状态 */
protected _connectionState: ConnectionState = "disconnected";
/** 当前重连尝试次数(成功连接后重置) */
protected reconnectAttempt = 0;
/** Subject 清理注册表 —— disconnect 时统一 complete */
protected activeSubjects = new Set<Subject<unknown>>();
// ============================================================
// 构造函数
// ============================================================
constructor(config: Partial<AdapterConfig> = {}) {
this.config = { ...DEFAULT_ADAPTER_CONFIG, ...config };
}
// ============================================================
// 连接状态(只读暴露)
// ============================================================
get connectionState(): ConnectionState {
return this._connectionState;
}
/** 更新连接状态并记录日志 */
protected setConnectionState(state: ConnectionState): void {
const prev = this._connectionState;
this._connectionState = state;
if (prev !== state) {
logger.info(
{ exchange: this.exchange, from: prev, to: state },
`[${this.exchange}] connection state: ${prev}${state}`,
);
}
}
// ============================================================
// 指数退避重连(所有子类复用)
// ============================================================
/**
* 执行指数退避重连。
*
* 延迟公式:delay = baseDelay × 2^min(attempt, 5)
* - attempt=0: 3s
* - attempt=1: 6s
* - attempt=2: 12s
* - attempt=5: 96s(之后不再翻倍)
*
* 超过 maxReconnectAttempts 后抛出错误。
*
* @throws 达到最大重试次数后抛出
*/
protected async reconnect(): Promise<void> {
const { reconnectBaseDelayMs: baseDelay, maxReconnectAttempts } = this.config;
if (this.reconnectAttempt >= maxReconnectAttempts) {
this.setConnectionState("error");
throw new Error(
`[${this.exchange}] 重连失败:已达最大重试次数 (${maxReconnectAttempts})`,
);
}
const cappedAttempt = Math.min(this.reconnectAttempt, 5);
const delay = baseDelay * Math.pow(2, cappedAttempt);
logger.warn(
{
exchange: this.exchange,
attempt: this.reconnectAttempt + 1,
maxAttempts: maxReconnectAttempts,
delayMs: delay,
},
`[${this.exchange}] WebSocket 重连中...`,
);
await sleep(delay);
this.reconnectAttempt++;
this.setConnectionState("connecting");
await this.connect();
}
/** 成功连接后重置重连计数器 */
protected resetReconnectAttempts(): void {
if (this.reconnectAttempt > 0) {
logger.info(
{ exchange: this.exchange, attempts: this.reconnectAttempt },
`[${this.exchange}] 重连成功,计数器重置`,
);
}
this.reconnectAttempt = 0;
}
// ============================================================
// Subject 管理工具
// ============================================================
/**
* 创建一个受管理的 Subjectdisconnect 时自动 complete。
* 子类在 subscribe* 方法中使用此工具创建 Subject。
*/
protected createManagedSubject<T>(): Subject<T> {
const subject = new Subject<T>();
this.activeSubjects.add(subject as Subject<unknown>);
return subject;
}
/** 完成所有受管理的 Subjectdisconnect 时调用) */
protected completeAllSubjects(): void {
for (const subject of this.activeSubjects) {
subject.complete();
}
this.activeSubjects.clear();
}
// ============================================================
// 抽象方法 —— 子类必须实现
// ============================================================
abstract connect(): Promise<void>;
abstract disconnect(): Promise<void>;
abstract subscribeTicker(symbols: string[]): Observable<Ticker>;
abstract subscribeTrade(symbols: string[]): Observable<Trade>;
abstract subscribeOrderbook(symbol: string, depth?: number): Observable<OrderBook>;
abstract fetchKlines(
symbol: string,
interval: KlineInterval,
startTime: number,
endTime: number,
limit?: number,
): Promise<Kline[]>;
abstract fetchMarkets(): Promise<MarketInfo[]>;
}
export default BaseExchangeAdapter;
+109
View File
@@ -0,0 +1,109 @@
// ============================================================
// base.ts — REST 客户端抽象基类
// ============================================================
// 各交易所适配器继承此类,复用 REST 请求限流、重试等通用逻辑。
// 子类通过注入各交易所原生 SDKbinance / okx / bybit ...
// 实现具体的数据拉取,无需依赖 ccxt。
//
// 子类只需实现:
// - fetchKlines() — 历史 K 线拉取(基于目标交易所 SDK)
// - fetchMarkets() — 交易对元数据拉取(用于自动注册交易对)
//
// WebSocket 实时行情由各适配器自行管理,不在此基类范围内。
// ============================================================
import { logger } from "../utils/logger";
import type { Kline, KlineInterval, MarketInfo, RestClientConfig } from "../types";
import { DEFAULT_REST_CONFIG } from "../types/base";
// ============================================================
// 工具:异步 sleep
// ============================================================
/** 返回一个在 ms 毫秒后 resolve 的 Promise */
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
// ============================================================
// BaseRestClient
// ============================================================
export abstract class BaseRestClient {
/** 交易所标识(子类必须覆盖) */
abstract readonly exchange: string;
/** REST 客户端配置(可在子类构造函数中覆盖默认值) */
protected readonly config: RestClientConfig;
/** REST 请求节流 Mapkey → 上次请求时间戳) */
private lastRestFetch = new Map<string, number>();
// ============================================================
// 构造函数
// ============================================================
constructor(config: Partial<RestClientConfig> = {}) {
this.config = { ...DEFAULT_REST_CONFIG, ...config };
}
// ============================================================
// 限流工具
// ============================================================
/**
* 对同一 key 的 REST 请求进行冷却节流。
* 若距离上次请求不足 restRateLimitMs,自动等待剩余时间。
*
* @param key - 节流标识(如 `${symbol}:${interval}`
*/
protected async throttle(key: string): Promise<void> {
const lastFetch = this.lastRestFetch.get(key) ?? 0;
const elapsed = Date.now() - lastFetch;
if (elapsed < this.config.restRateLimitMs) {
const waitMs = this.config.restRateLimitMs - elapsed;
logger.debug(
{ exchange: this.exchange, key, waitMs },
`[${this.exchange}] REST 限流等待 ${waitMs}ms`,
);
await sleep(waitMs);
}
this.lastRestFetch.set(key, Date.now());
}
// ============================================================
// 抽象方法 —— 子类必须使用各自的交易所 SDK 实现
// ============================================================
/**
* 拉取历史 K 线数据(REST)。
*
* 子类负责:
* 1. 调用交易所原生 SDK 的 K 线接口
* 2. 将原始数据转换为本系统标准化 Kline 结构
* 3. 处理分页逻辑(若时间跨度超过单次请求上限)
*
* @param symbol - 交易对符号(如 BTCUSDT
* @param interval - K 线周期
* @param startTime - 起始时间(Unix ms
* @param endTime - 结束时间(Unix ms
* @param limit - 单次最大条数(默认取自 config.defaultLimit
*/
abstract fetchKlines(
symbol: string,
interval: KlineInterval,
startTime: number,
endTime: number,
limit?: number,
): Promise<Kline[]>;
/**
* 获取交易所交易对信息(REST)。
*
* 子类负责调用交易所原生 SDK 的 /exchangeInfo 等价接口,
* 并转换为本系统标准化 MarketInfo 结构。
*/
abstract fetchMarkets(): Promise<MarketInfo[]>;
}
export default BaseRestClient;
-782
View File
@@ -1,782 +0,0 @@
// ============================================================
// binance.ts — Binance 交易所适配器
// ============================================================
// 基于 Binance 官方 SDKbinance@3.x)实现 MarketDataFeed 接口。
//
// WebSocket:使用 SDK 内置 WebsocketClient,自动处理多路复用、
// 断线重连、心跳保活。通过 formattedMessage 事件接收已解析的
// 类型化行情数据,转换为本系统标准化结构后通过 RxJS Subject 发布。
//
// REST:使用 SDK 内置 MainClientSpot),用于:
// - fetchKlines() 历史 K 线补拉
// - fetchMarkets() 交易对元数据(用于自动注册到 trading_pairs 表)
//
// ============================================================
// 风险提示:
// - Binance WebSocket 单连接最多订阅 1024 个 stream
// 超出需拆分多连接(SDK 自动处理)
// - 生产环境建议使用 Binance 的 combined streams 合并请求
// - REST API 限频:1200 请求/分钟(权重制),fetchKlines 权重 2
// ============================================================
import { Subject, type Observable } from "rxjs";
import {
WebsocketClient,
MainClient,
type WsMessageKlineFormatted,
type WsMessageTradeFormatted,
type WsMessage24hrTickerFormatted,
type WsMessageBookTickerEventFormatted,
type WsMessagePartialBookDepthEventFormatted,
type WsFormattedMessage,
} from "binance";
import type { Kline as BinanceRestKline } from "binance";
import { BaseExchangeAdapter } from "./base";
import { logger } from "../utils/logger";
import type {
Ticker,
Trade,
OrderBook,
Kline,
KlineInterval,
MarketInfo,
AdapterConfig,
BinanceRawKline,
} from "./types";
import { KLINE_INTERVAL_MS } from "./types";
// ============================================================
// Binance K 线周期 ← → 本系统 K 线周期映射
// ============================================================
/**
* Binance SDK 支持的 K 线周期(比本系统更多)。
* 本系统仅使用其中的子集,其余周期由 pipeline 合成。
*/
type BinanceKlineInterval =
| "1m"
| "5m"
| "15m"
| "30m"
| "1h"
| "4h"
| "1d"
| "1w";
/** 本系统 KlineInterval → Binance SDK KlineInterval1:1 子集映射) */
const INTERVAL_TO_BINANCE: Record<KlineInterval, BinanceKlineInterval> = {
"1m": "1m",
"5m": "5m",
"15m": "15m",
"30m": "30m",
"1h": "1h",
"4h": "4h",
"1d": "1d",
"1w": "1w",
};
// ============================================================
// 默认适配器配置(Binance 专用覆盖)
// ============================================================
const DEFAULT_BINANCE_CONFIG: AdapterConfig = {
reconnectBaseDelayMs: 3000,
maxReconnectAttempts: 10,
/** Binance REST API 权重制限频,保守设为 250ms */
restRateLimitMs: 250,
};
// ============================================================
// BinanceAdapter
// ============================================================
export class BinanceAdapter extends BaseExchangeAdapter {
readonly exchange = "binance";
// ----------------------------------------------------------
// SDK 客户端实例
// ----------------------------------------------------------
/** Binance WebSocket 客户端(内置多路复用 + 自动重连) */
private wsClient!: WebsocketClient;
/** Binance REST 客户端(Spot */
private restClient!: MainClient;
// ----------------------------------------------------------
// RxJS Subject —— 按事件类型分频道发布
// ----------------------------------------------------------
/** 24h Ticker 流(合并所有已订阅 symbol */
private tickerSubject!: Subject<Ticker>;
/** 逐笔成交流(合并所有已订阅 symbol) */
private tradeSubject!: Subject<Trade>;
/** 订单簿深度流(合并所有已订阅 symbol) */
private orderbookSubjects = new Map<string, Subject<OrderBook>>();
// ----------------------------------------------------------
// 订阅追踪
// ----------------------------------------------------------
/** 当前已订阅的 ticker symbol 集合 */
private subscribedTickerSymbols = new Set<string>();
/** 当前已订阅的 trade symbol 集合 */
private subscribedTradeSymbols = new Set<string>();
/** 当前已订阅的 orderbook symbol → depth 映射 */
private subscribedOrderbookDepths = new Map<string, number>();
/** 防止重复 REST 请求的节流 Mapsymbol:interval → lastFetchTime */
private lastRestFetch = new Map<string, number>();
// ============================================================
// 构造函数
// ============================================================
constructor(config: Partial<AdapterConfig> = {}) {
super({ ...DEFAULT_BINANCE_CONFIG, ...config });
}
// ============================================================
// 连接管理
// ============================================================
/**
* 建立 WebSocket 连接并注册事件监听。
*
* Binance SDK 的 WebsocketClient 在首次 subscribe() 时自动建连,
* 此处主动调用 connectPublic() 预热连接并注册 formattedMessage 监听。
*/
async connect(): Promise<void> {
if (this._connectionState === "connected") {
logger.debug(`[binance] 已连接,跳过重复 connect`);
return;
}
this.setConnectionState("connecting");
try {
// 初始化 WebSocket 客户端
this.wsClient = new WebsocketClient({
// 生产环境使用 Binance 主网
// useTestnet: false 为默认值
});
// 初始化 REST 客户端(公开接口无需 API Key)
this.restClient = new MainClient();
// 注册 formattedMessage 事件 —— SDK 将原始 JSON 解析为类型化对象
this.wsClient.on("formattedMessage", this.onFormattedMessage.bind(this));
// 注册重连事件(利用 SDK 内置的自动重连)
this.wsClient.on("reconnecting", (evt) => {
logger.warn(
{ wsKey: evt.wsKey },
`[binance] WebSocket 重连中...`,
);
this.setConnectionState("connecting");
});
this.wsClient.on("reconnected", (evt) => {
logger.info(
{ wsKey: evt.wsKey, wsUrl: evt.wsUrl },
`[binance] WebSocket 重连成功`,
);
this.setConnectionState("connected");
this.resetReconnectAttempts();
});
this.wsClient.on("close", (evt) => {
logger.warn(
{ wsKey: evt.wsKey },
`[binance] WebSocket 连接关闭`,
);
// 如果之前是已连接状态,SDK 会自动重连
if (this._connectionState === "connected") {
this.setConnectionState("connecting");
}
});
// 预热连接(SDK 连接到 spot 公开行情端点)
await this.wsClient.connectPublic();
this.setConnectionState("connected");
this.resetReconnectAttempts();
logger.info(`[binance] WebSocket 连接已建立`);
} catch (err) {
this.setConnectionState("error");
logger.error({ err }, `[binance] 连接失败`);
throw err;
}
}
/**
* 断开连接并清理资源。
*
* 1. 取消所有 WebSocket 订阅
* 2. 关闭 WebSocket 客户端
* 3. Complete 所有 RxJS Subject
*/
async disconnect(): Promise<void> {
logger.info(`[binance] 断开连接...`);
try {
// 取消所有订阅
if (this.wsClient && this.subscribedTradeSymbols.size > 0) {
const topics = [...this.subscribedTradeSymbols].map(
(s) => `${s.toLowerCase()}@trade`,
);
await this.wsClient.unsubscribe(topics, "main");
}
if (this.wsClient && this.subscribedTickerSymbols.size > 0) {
const topics = [...this.subscribedTickerSymbols].map(
(s) => `${s.toLowerCase()}@ticker`,
);
await this.wsClient.unsubscribe(topics, "main");
}
if (this.wsClient && this.subscribedOrderbookDepths.size > 0) {
for (const [symbol, depth] of this.subscribedOrderbookDepths) {
const topic = `${symbol.toLowerCase()}@depth${depth}@100ms`;
await this.wsClient.unsubscribe([topic], "main");
}
}
} catch (err) {
logger.warn({ err }, `[binance] 取消订阅时出错(忽略)`);
}
// 关闭 WS 客户端所有连接(SDK 自动关闭底层 WebSocket
try {
this.wsClient?.closeAll();
} catch {
// 忽略关闭错误
}
// Complete 所有 Subject
this.tickerSubject?.complete();
this.tradeSubject?.complete();
for (const subject of this.orderbookSubjects.values()) {
subject.complete();
}
this.orderbookSubjects.clear();
this.subscribedTickerSymbols.clear();
this.subscribedTradeSymbols.clear();
this.subscribedOrderbookDepths.clear();
this.setConnectionState("disconnected");
logger.info(`[binance] 已断开连接`);
}
// ============================================================
// 订阅 Ticker24h 滚动统计)
// ============================================================
subscribeTicker(symbols: string[]): Observable<Ticker> {
if (!this.tickerSubject) {
this.tickerSubject = this.createManagedSubject<Ticker>();
}
const newSymbols = symbols.filter(
(s) => !this.subscribedTickerSymbols.has(s),
);
if (newSymbols.length > 0 && this._connectionState === "connected") {
const topics = newSymbols.map(
(s) => `${s.toLowerCase()}@ticker`,
);
this.wsClient.subscribe(topics, "main").catch((err) => {
logger.error({ err, symbols: newSymbols }, `[binance] 订阅 ticker 失败`);
});
for (const s of newSymbols) {
this.subscribedTickerSymbols.add(s);
}
logger.info(
{ count: newSymbols.length, symbols: newSymbols },
`[binance] 订阅 ticker`,
);
}
return this.tickerSubject.asObservable();
}
// ============================================================
// 订阅逐笔成交
// ============================================================
subscribeTrade(symbols: string[]): Observable<Trade> {
if (!this.tradeSubject) {
this.tradeSubject = this.createManagedSubject<Trade>();
}
const newSymbols = symbols.filter(
(s) => !this.subscribedTradeSymbols.has(s),
);
if (newSymbols.length > 0 && this._connectionState === "connected") {
const topics = newSymbols.map(
(s) => `${s.toLowerCase()}@trade`,
);
this.wsClient.subscribe(topics, "main").catch((err) => {
logger.error({ err, symbols: newSymbols }, `[binance] 订阅 trade 失败`);
});
for (const s of newSymbols) {
this.subscribedTradeSymbols.add(s);
}
logger.info(
{ count: newSymbols.length, symbols: newSymbols },
`[binance] 订阅 trade`,
);
}
return this.tradeSubject.asObservable();
}
// ============================================================
// 订阅订单簿深度
// ============================================================
subscribeOrderbook(symbol: string, depth: number = 20): Observable<OrderBook> {
const key = `${symbol}@${depth}`;
let subject = this.orderbookSubjects.get(key);
if (subject) {
return subject.asObservable();
}
subject = this.createManagedSubject<OrderBook>();
this.orderbookSubjects.set(key, subject);
if (this._connectionState === "connected") {
const topic = `${symbol.toLowerCase()}@depth${depth}@100ms`;
this.wsClient.subscribe([topic], "main").catch((err) => {
logger.error({ err, symbol, depth }, `[binance] 订阅 orderbook 失败`);
});
this.subscribedOrderbookDepths.set(symbol, depth);
logger.info({ symbol, depth }, `[binance] 订阅 orderbook`);
}
return subject.asObservable();
}
// ============================================================
// REST:拉取历史 K 线(补缺失数据 / 回测)
// ============================================================
/**
* 通过 Binance REST API 拉取历史 K 线。
*
* Binance 限制:
* - 单次最多 1000 条(默认 500)
* - 权重 21200 权重/分钟 → 600 次请求/分钟)
* - 自动分页逻辑:如果时间跨度超过 limit 条,自动多次请求拼接
*
* @param symbol - 交易对(如 BTCUSDT
* @param interval - K 线周期
* @param startTime - 起始时间(Unix ms
* @param endTime - 结束时间(Unix ms
* @param limit - 单次最大条数(默认 500,最大 1000)
*/
async fetchKlines(
symbol: string,
interval: KlineInterval,
startTime: number,
endTime: number,
limit: number = 500,
): Promise<Kline[]> {
const binanceInterval = INTERVAL_TO_BINANCE[interval];
const intervalMs = KLINE_INTERVAL_MS[interval];
const maxLimit = Math.min(limit, 1000); // Binance 硬限制 1000
const allKlines: Kline[] = [];
let currentStart = startTime;
// 自动分页:如果时间跨度超过 maxLimit 条 K 线,分批拉取
while (currentStart < endTime) {
// 速率限制(保守节流)
const throttleKey = `${symbol}:${interval}`;
const lastFetch = this.lastRestFetch.get(throttleKey) ?? 0;
const elapsed = Date.now() - lastFetch;
if (elapsed < this.config.restRateLimitMs) {
await new Promise((r) =>
setTimeout(r, this.config.restRateLimitMs - elapsed),
);
}
this.lastRestFetch.set(throttleKey, Date.now());
try {
const rawKlines = await this.restClient.getKlines({
symbol,
interval: binanceInterval,
startTime: currentStart,
endTime,
limit: maxLimit,
});
if (!rawKlines || rawKlines.length === 0) {
break; // 无更多数据
}
// 转换 Binance REST K 线 → 本系统标准化 K 线
const converted = rawKlines.map((k) =>
this.convertRestKline(k, symbol, interval),
);
allKlines.push(...converted);
// Binance REST K 线格式:[openTime, open, high, low, close, volume, closeTime, ...]
// 最后一条的开盘时间 + interval 作为下一批的起点
const lastKline = rawKlines[rawKlines.length - 1]!;
const lastOpenTime = (lastKline as number[])[0] as number;
currentStart = lastOpenTime + intervalMs;
// 如果返回数量 < limit,说明已拉完
if (rawKlines.length < maxLimit) {
break;
}
} catch (err) {
logger.error(
{ err, symbol, interval, currentStart, endTime },
`[binance] fetchKlines 失败`,
);
throw err;
}
}
logger.debug(
{ symbol, interval, count: allKlines.length, startTime, endTime },
`[binance] fetchKlines 完成`,
);
return allKlines;
}
// ============================================================
// REST:拉取交易对元数据
// ============================================================
/**
* 从 Binance 获取所有现货交易对信息,转换为本系统 MarketInfo 格式。
*
* 用于自动注册到 trading_pairs 表,避免手动配置。
*/
async fetchMarkets(): Promise<MarketInfo[]> {
logger.info(`[binance] 拉取交易对信息...`);
try {
const exchangeInfo = await this.restClient.getExchangeInfo();
const markets: MarketInfo[] = [];
for (const symbolInfo of exchangeInfo.symbols) {
// 仅保留状态为 TRADING 的现货交易对
if (symbolInfo.status !== "TRADING") continue;
const filters = symbolInfo.filters;
// 从 filters 中提取交易规则
let tickSize: string | undefined;
let stepSize: string | undefined;
let minQty: string | undefined;
let minNotional: string | undefined;
for (const filter of filters) {
switch (filter.filterType) {
case "PRICE_FILTER":
tickSize = (filter as { tickSize: string }).tickSize;
break;
case "LOT_SIZE":
stepSize = (filter as { stepSize: string }).stepSize;
minQty = (filter as { minQty: string }).minQty;
break;
case "MIN_NOTIONAL":
case "NOTIONAL":
minNotional = (filter as { minNotional: string }).minNotional;
break;
}
}
markets.push({
symbol: symbolInfo.symbol,
baseAsset: symbolInfo.baseAsset,
quoteAsset: symbolInfo.quoteAsset,
pricePrecision: symbolInfo.quoteAssetPrecision,
quantityPrecision: symbolInfo.baseAssetPrecision,
minQty: minQty ? parseFloat(minQty) : undefined,
stepSize: stepSize ? parseFloat(stepSize) : undefined,
minNotional: minNotional ? parseFloat(minNotional) : undefined,
});
}
logger.info(
{ count: markets.length },
`[binance] 交易对信息拉取完成`,
);
return markets;
} catch (err) {
logger.error({ err }, `[binance] fetchMarkets 失败`);
throw err;
}
}
// ============================================================
// 内部:formattedMessage 事件分发
// ============================================================
/**
* Binance SDK formattedMessage 回调。
*
* SDK 已将原始 WebSocket JSON 解析为类型化事件对象。
* WsFormattedMessage 是复杂联合类型(含单事件 + 事件数组),
* TypeScript 的判别联合在此处不够精确。内部使用 `as any` 绕过
* 联合类型限制,按 eventType 字符串运行时路由。
*/
private onFormattedMessage(msg: WsFormattedMessage): void {
try {
// 数组类型(如 !ticker@arr → WsMessage24hrTickerFormatted[]
if (Array.isArray(msg)) {
for (const item of msg) {
this.routeByEventType(item as unknown as Record<string, unknown>);
}
return;
}
this.routeByEventType(msg as unknown as Record<string, unknown>);
} catch (err) {
const raw = msg as unknown as Record<string, unknown>;
const eventType = String(raw["eventType"] ?? "unknown");
logger.error(
{ err, eventType },
`[binance] 处理 formattedMessage 时出错`,
);
}
}
/**
* 按 eventType 运行时路由到对应 Subject。
*
* 此处使用 unknown → Record 转换,因为 WsFormattedMessage
* 联合类型包含数组成员导致无法直接访问 eventType。
*/
private routeByEventType(raw: Record<string, unknown>): void {
const eventType = String(raw["eventType"] ?? "");
if (!eventType) return;
switch (eventType) {
case "24hrTicker":
case "!ticker@arr":
this.handleTickerMessage(
raw as unknown as WsMessage24hrTickerFormatted,
);
break;
case "trade":
this.handleTradeMessage(
raw as unknown as WsMessageTradeFormatted,
);
break;
case "bookTicker":
this.handleBookTickerMessage(
raw as unknown as WsMessageBookTickerEventFormatted,
);
break;
case "partialBookDepth":
this.handleOrderbookMessage(
raw as unknown as WsMessagePartialBookDepthEventFormatted,
);
break;
case "kline":
// K 线事件不在 adapter 层分发,由 pipeline 的 KlineSynthesizer 处理
break;
default:
// 忽略其他事件类型(用户数据流、账户更新等)
break;
}
}
// ----------------------------------------------------------
// 事件转换器:Binance → 本系统标准化类型
// ----------------------------------------------------------
/** 24h Ticker → Ticker */
private handleTickerMessage(msg: WsMessage24hrTickerFormatted): void {
if (!this.tickerSubject || this.tickerSubject.closed) return;
const ticker: Ticker = {
exchange: "binance",
symbol: msg.symbol,
lastPrice: msg.currentClose,
openPrice: msg.open,
highPrice: msg.high,
lowPrice: msg.low,
volume: msg.baseAssetVolume,
quoteVolume: msg.quoteAssetVolume,
priceChange: msg.priceChange,
priceChangePercent: msg.priceChangePercent,
bidPrice: msg.bestBid,
bidQty: msg.bestBidQuantity,
askPrice: msg.bestAskPrice,
askQty: msg.bestAskQuantity,
eventTime: msg.eventTime,
closeTime: msg.closeTime,
};
this.tickerSubject.next(ticker);
}
/** 逐笔成交 → Trade */
private handleTradeMessage(msg: WsMessageTradeFormatted): void {
if (!this.tradeSubject || this.tradeSubject.closed) return;
const trade: Trade = {
exchange: "binance",
symbol: msg.symbol,
price: msg.price,
amount: msg.quantity,
quoteAmount: msg.price * msg.quantity,
timestamp: msg.time,
isBuyerMaker: msg.maker,
tradeId: String(msg.tradeId),
};
this.tradeSubject.next(trade);
}
/** BookTicker → Ticker(精简版,仅有最佳买卖价) */
private handleBookTickerMessage(msg: WsMessageBookTickerEventFormatted): void {
// BookTicker 是 Ticker 的精简版,仅更新最佳买卖价
// 如果有 tickerSubject,将其作为轻量 Ticker 推送
if (!this.tickerSubject || this.tickerSubject.closed) return;
const ticker: Ticker = {
exchange: "binance",
symbol: msg.symbol,
lastPrice: 0, // bookTicker 不含最新价
openPrice: 0,
highPrice: 0,
lowPrice: 0,
volume: 0,
quoteVolume: 0,
priceChange: 0,
priceChangePercent: 0,
bidPrice: msg.bidPrice,
bidQty: msg.bidQty,
askPrice: msg.askPrice,
askQty: msg.askQty,
eventTime: msg.eventTime,
closeTime: 0,
};
this.tickerSubject.next(ticker);
}
/** 订单簿深度快照 → OrderBook */
private handleOrderbookMessage(msg: WsMessagePartialBookDepthEventFormatted): void {
// partialBookDepth 不含 symbol 字段(取决于 SDK 版本),
// 从 stream 名称或上下文推断 symbol。此处假设 SDK 已填充 symbol。
const symbol = (msg as WsMessagePartialBookDepthEventFormatted & { symbol?: string }).symbol;
if (!symbol) {
logger.warn(`[binance] 收到无 symbol 的 orderbook 消息,丢弃`);
return;
}
// 查找匹配的 orderbook Subject(遍历所有已订阅 depth
for (const [key, subject] of this.orderbookSubjects) {
const [subscribedSymbol] = key.split("@");
if (
subscribedSymbol?.toUpperCase() === symbol.toUpperCase() &&
!subject.closed
) {
const orderbook: OrderBook = {
exchange: "binance",
symbol: symbol.toUpperCase(),
bids: msg.bids.map(
([price, qty]) => [parseFloat(String(price)), parseFloat(String(qty))] as [number, number],
),
asks: msg.asks.map(
([price, qty]) => [parseFloat(String(price)), parseFloat(String(qty))] as [number, number],
),
lastUpdateId: msg.lastUpdateId,
eventTime: Date.now(), // partialBookDepth 不含 eventTime
};
subject.next(orderbook);
return;
}
}
}
// ============================================================
// 内部:REST K 线格式转换
// ============================================================
/**
* 将 Binance REST K 线数组(元组)转换为本系统 Kline 对象。
*
* Binance REST K 线格式:
* [
* 0: openTime (ms),
* 1: open (string),
* 2: high (string),
* 3: low (string),
* 4: close (string),
* 5: volume (string),
* 6: closeTime (ms),
* 7: quoteVolume (string),
* 8: tradeCount (number),
* 9: takerBuyBaseVol (string),
* 10: takerBuyQuoteVol (string),
* 11: ignore (string)
* ]
*/
private convertRestKline(
raw: BinanceRestKline,
symbol: string,
interval: KlineInterval,
): Kline {
// BinanceRestKline 是元组类型,按位置索引
const arr = raw as unknown as [
number, // 0: openTime
string, // 1: open
string, // 2: high
string, // 3: low
string, // 4: close
string, // 5: volume
number, // 6: closeTime
string, // 7: quoteVolume
number, // 8: tradeCount
string, // 9: takerBuyBaseVol
string, // 10: takerBuyQuoteVol
string, // 11: ignore
];
return {
exchange: "binance",
symbol,
interval,
openTime: arr[0],
closeTime: arr[6],
open: parseFloat(arr[1]),
high: parseFloat(arr[2]),
low: parseFloat(arr[3]),
close: parseFloat(arr[4]),
volume: parseFloat(arr[5]),
quoteVolume: parseFloat(arr[7]),
takerBuyBaseVol: parseFloat(arr[9]),
takerBuyQuoteVol: parseFloat(arr[10]),
tradeCount: arr[8],
isClosed: true, // REST 返回的 K 线都是已闭合的
};
}
}
export default BinanceAdapter;
+253
View File
@@ -0,0 +1,253 @@
import { MainClient, type Kline as BinanceRestKline } from "binance";
import { logger } from "../utils/logger";
import { BaseRestClient } from './base_rest';
import type { KlineInterval, Kline, MarketInfo } from '../types';
/** K 线周期 → 毫秒数映射(用于时间桶计算) */
export const KLINE_INTERVAL_MS: Record<KlineInterval, number> = {
"1m": 60_000,
"5m": 300_000,
"15m": 900_000,
"30m": 1_800_000,
"1h": 3_600_000,
"4h": 14_400_000,
"1d": 86_400_000,
"1w": 604_800_000,
};
// ============================================================
// Binance REST K 线 → 本系统标准化 Kline 转换
// ============================================================
/**
* Binance SDK Kline 元组格式(getKlines / getUIKlines 返回):
* [0] openTime: number — 开盘时间(Unix ms)
* [1] open: numberInString — 开盘价
* [2] high: numberInString — 最高价
* [3] low: numberInString — 最低价
* [4] close: numberInString — 收盘价
* [5] volume: numberInString — 成交量(base 币种)
* [6] closeTime: number — 收盘时间(Unix ms)
* [7] quoteVolume: numberInString — 成交额(quote 币种)
* [8] tradeCount: number — 成交笔数
* [9] takerBuyBaseVol: numberInString — 主动买入成交量
* [10] takerBuyQuoteVol: numberInString — 主动买入成交额
* [11] ignore: numberInString — 忽略字段
*
* numberInString = string | number,通过 Number() 统一转换。
*
* 参考:node_modules/binance/lib/types/shared.d.ts:85-98
*/
function convertBinanceKline(
raw: BinanceRestKline,
symbol: string,
interval: KlineInterval,
): Kline {
const [
openTime,
open,
high,
low,
close,
volume,
closeTime,
quoteVolume,
tradeCount,
takerBuyBaseVol,
takerBuyQuoteVol,
// [11] ignore — 丢弃
] = raw;
return {
exchange: "binance",
symbol,
interval,
openTime: openTime,
closeTime: closeTime,
open: String(open),
high: String(high),
low: String(low),
close: String(close),
volume: String(volume),
quoteVolume: String(quoteVolume),
takerBuyBaseVol: String(takerBuyBaseVol),
takerBuyQuoteVol: String(takerBuyQuoteVol),
tradeCount: String(tradeCount),
isClosed: true, // REST 返回的 K 线均为已闭合历史数据
};
}
// ============================================================
// Binance REST 拉取函数
// ============================================================
/**
* 通过 Binance 原生 SDK 拉取 UI K 线并转换为本系统 Kline。
*
* getUIKlines 与 getKlines 返回同构的 Kline[] 元组,
* getUIKlines 额外支持 timeZone 参数,适合按交易所时区对齐。
*
* @param symbol - 交易对(如 BTCUSDT
* @param interval - K 线周期
* @param startTime - 起始时间(Unix ms
* @param endTime - 结束时间(Unix ms),可选
* @param limit - 单次拉取条数,默认 500(最大 1000)
*/
async function fetchBinanceKlines(
symbol: string,
interval: KlineInterval,
startTime: number,
endTime?: number,
limit = 500,
): Promise<Kline[]> {
const client = new MainClient({
api_key: 'ONSJKIGRpDYLn6FdV17aAKfjclZ4I2LzamflhuMpsoRQA427lLKeyJlGtg2RZ7DH',
api_secret: '5Mfv4TgvDlRzCHbtl2nJL4mVHUvMm8pyjKiRjMoosBMxrhlqMw6CuQbg2qbS2Npd',
});
// Binance 硬限制:单次最多 1000 条
const safeLimit = Math.min(limit, 1000);
const rawKlines = await client.getKlines({
symbol,
interval,
startTime,
endTime,
limit: safeLimit,
});
logger.info({
symbol,
interval,
startTime,
endTime,
limit: safeLimit,
}, 'fetchBinanceKlines arguments');
if (!rawKlines || rawKlines.length === 0) {
return [];
}
return filterConsecutive(
rawKlines.map((k, index) => {
// if (index === rawKlines.length - 1) {
// console.log(k);
// }
return convertBinanceKline(k, symbol, interval);
}),
interval,
);
}
/**
* 过滤出严格连续(无时间缺口)的 K 线序列。
*
* 处理流程:
* 1. 按 openTime 升序排序(防御性,确保时间单调递增)
* 2. 从首条 K 线开始遍历,仅保留相邻间隔恰好等于 intervalMs 的条目
* 3. 一旦检测到缺口(间隔 ≠ intervalMs),立即终止并丢弃后续所有数据
*
* 设计意图:
* - 时间序列分析(回测、指标计算)依赖连续数据,缺口会引入偏误
* - 缺口之后的数据可能来自另一段不连续的拉取结果,混入后风险更高
* - "截断"策略优于"填充/跳过",避免伪造数据或隐藏数据质量问题
*
* @param klines - 待过滤的 K 线数组(可能乱序、可能含缺口)
* @param interval - K 线周期,用于查表获取 intervalMs
* @returns 从首条开始严格连续的最大前缀子序列;空数组无缺口时返回完整排序结果
*/
function filterConsecutive(klines: Kline[], interval: KlineInterval) {
// 查表获取当前 K 线周期对应的毫秒数
const intervalMs = KLINE_INTERVAL_MS[interval];
// 防御性排序:Binance API 不保证返回顺序,升序排列确保时间单调
const results = klines.sort((a: Kline, b: Kline) => {
return a.openTime - b.openTime;
});
return results;
// console.log(results);
// let _openTime = 0; // 哨兵:0 表示尚未初始化,非 0 表示上一条已收录 K 线的 openTime
// const rets: Kline[] = []; // 累积连续 K 线结果
// for (let item of results) {
// console.log(item.openTime);
// // 分支 1 —— 首条 K 线:无条件收录,并初始化哨兵
// if (_openTime === 0) {
// _openTime = item.openTime;
// rets.push(item);
// continue;
// }
// // 分支 2 —— 严格连续:当前 openTime 与上一条恰好相差一个周期
// if (item.openTime - _openTime === intervalMs) {
// _openTime = item.openTime;
// rets.push(item);
// continue;
// }
// // 分支 3 —— 检测到缺口:截断,丢弃当前及之后所有 K 线
// break;
// }
// return rets;
}
// ============================================================
// Client —— 多交易所 REST 客户端
// ============================================================
export class Client extends BaseRestClient {
exchange: string;
/**
* @param exchange - 交易所 ID(如 "binance"、"okx"、"bybit"
* 内部根据 ID 分发到对应的 SDK 实现
*/
constructor(exchange: string) {
super();
this.exchange = exchange;
}
/**
* 拉取历史 K 线数据,返回标准化 Kline 数组。
*
* 根据交易所 ID 分发到各自的 SDK 拉取函数。
*
* @param symbol - 交易对符号(如 BTCUSDT
* @param interval - K 线周期
* @param startTime - 起始时间(Unix ms
* @param endTime - 结束时间(Unix ms),可选
* @param limit - 最大返回条数,默认取自 config.defaultLimit
*/
async fetchKlines(
symbol: string,
interval: KlineInterval,
startTime: number,
limit?: number,
endTime?: number,
): Promise<Kline[]> {
const effectiveLimit = limit ?? this.config.defaultLimit;
switch (this.exchange) {
case "binance":
return fetchBinanceKlines(symbol, interval, startTime, endTime, effectiveLimit);
// TODO: 新增交易所在此添加 case
// case "okx":
// return fetchOkxKlines(symbol, interval, startTime, endTime, effectiveLimit);
default:
throw new Error(
`[Client] 不支持的交易所: "${this.exchange}"` +
`当前仅支持: binance`,
);
}
}
async fetchMarkets(): Promise<MarketInfo[]> {
// TODO: 各交易所实现
return [];
}
}
-299
View File
@@ -1,299 +0,0 @@
// ============================================================
// types.ts — 统一行情数据类型定义与 MarketDataFeed 接口
// ============================================================
// 所有交易所适配器共享的数据结构和接口契约。
// 适配器负责将交易所原生数据格式转换为以下标准化类型。
//
// 设计原则:
// - 字段语义与 Binance/OKX/Bybit 通用概念对齐
// - 时间戳统一使用 Unix 毫秒(number),便于排序和计算
// - 价格/数量使用 number 类型(JavaScript 64-bit float),
// 对精度敏感场景(如 orderbook 快照)保留原始字符串
// ============================================================
import type { Observable } from "rxjs";
// ============================================================
// K 线周期
// ============================================================
/** K 线周期枚举(与 kline.entity.ts 中 KlineInterval 保持一致) */
export type KlineInterval =
| "1m"
| "5m"
| "15m"
| "30m"
| "1h"
| "4h"
| "1d"
| "1w";
/** K 线周期 → 毫秒数映射(用于时间桶计算) */
export const KLINE_INTERVAL_MS: Record<KlineInterval, number> = {
"1m": 60_000,
"5m": 300_000,
"15m": 900_000,
"30m": 1_800_000,
"1h": 3_600_000,
"4h": 14_400_000,
"1d": 86_400_000,
"1w": 604_800_000,
};
// ============================================================
// 标准化行情数据结构
// ============================================================
/** 24 小时滚动 Ticker 统计 */
export interface Ticker {
/** 交易所标识 */
exchange: string;
/** 交易对符号(大写,如 BTCUSDT) */
symbol: string;
/** 最新成交价 */
lastPrice: number;
/** 24h 开盘价 */
openPrice: number;
/** 24h 最高价 */
highPrice: number;
/** 24h 最低价 */
lowPrice: number;
/** 24h 成交量(base 币种) */
volume: number;
/** 24h 成交额(quote 币种) */
quoteVolume: number;
/** 24h 价格变化 */
priceChange: number;
/** 24h 价格变化百分比(0.05 = 5% */
priceChangePercent: number;
/** 买一价 */
bidPrice: number;
/** 买一量 */
bidQty: number;
/** 卖一价 */
askPrice: number;
/** 卖一量 */
askQty: number;
/** 事件发生时间(Unix ms */
eventTime: number;
/** 交易所收盘时间(Unix ms,用于判断 K 线是否闭合) */
closeTime: number;
}
/** 逐笔成交 */
export interface Trade {
/** 交易所标识 */
exchange: string;
/** 交易对符号 */
symbol: string;
/** 成交价 */
price: number;
/** 成交数量(base 币种) */
amount: number;
/** 成交额(quote 币种 = price × amount */
quoteAmount: number;
/** 成交时间(Unix ms */
timestamp: number;
/** 买方是否为挂单方(true = 主动卖出 / taker sell */
isBuyerMaker: boolean;
/** 交易所成交 ID(可能为字符串,如 Binance tradeId 为 bigint */
tradeId: string;
}
/** 订单簿深度快照 */
export interface OrderBook {
/** 交易所标识 */
exchange: string;
/** 交易对符号 */
symbol: string;
/** 买单列表 [[price, qty], ...],按价格降序(买一在前) */
bids: [number, number][];
/** 卖单列表 [[price, qty], ...],按价格升序(卖一在前) */
asks: [number, number][];
/** 上次更新 ID */
lastUpdateId: number;
/** 事件发生时间(Unix ms */
eventTime: number;
}
/** 标准化 K 线(OHLCV */
export interface Kline {
/** 交易所标识 */
exchange: string;
/** 交易对符号 */
symbol: string;
/** K 线周期 */
interval: KlineInterval;
/** 开盘时间(Unix ms */
openTime: number;
/** 收盘时间(Unix ms */
closeTime: number;
/** 开盘价 */
open: number;
/** 最高价 */
high: number;
/** 最低价 */
low: number;
/** 收盘价 */
close: number;
/** 成交量(base 币种) */
volume: number;
/** 成交额(quote 币种) */
quoteVolume: number;
/** 主动买入成交量(base 币种) */
takerBuyBaseVol: number;
/** 主动买入成交额(quote 币种) */
takerBuyQuoteVol: number;
/** 成交笔数 */
tradeCount: number;
/** 该 K 线是否已关闭(不再更新) */
isClosed: boolean;
}
/** K 线增量更新(仅推送最新一根 OHLCV 变化) */
export interface KlineDelta {
exchange: string;
symbol: string;
interval: KlineInterval;
openTime: number;
closeTime: number;
open: number;
high: number;
low: number;
close: number;
volume: number;
isClosed: boolean;
}
// ============================================================
// WebSocket 连接状态
// ============================================================
/** 连接状态枚举 */
export type ConnectionState =
| "disconnected"
| "connecting"
| "connected"
| "error";
// ============================================================
// 适配器配置
// ============================================================
/** 交易所适配器通用配置 */
export interface AdapterConfig {
/** 指数退避重连基数(毫秒),默认 3000 */
reconnectBaseDelayMs: number;
/** 最大重连次数,默认 10 */
maxReconnectAttempts: number;
/** REST API 请求冷却时间(毫秒),默认 200 */
restRateLimitMs: number;
}
/** 默认适配器配置 */
export const DEFAULT_ADAPTER_CONFIG: AdapterConfig = {
reconnectBaseDelayMs: 3000,
maxReconnectAttempts: 10,
restRateLimitMs: 200,
};
// ============================================================
// MarketDataFeed 接口 —— 所有交易所适配器必须实现
// ============================================================
/**
* 统一行情数据源接口。
*
* 每个交易所适配器实现此接口,向上层管道暴露标准化数据流。
* 使用 RxJS Observable 作为统一推送机制,pipeline 层可自由
* 组合、过滤、分流各交易所数据。
*/
export interface MarketDataFeed {
/** 交易所标识(如 "binance" */
readonly exchange: string;
/** 当前连接状态 */
readonly connectionState: ConnectionState;
/** 建立 WebSocket 连接 */
connect(): Promise<void>;
/** 断开连接 */
disconnect(): Promise<void>;
/**
* 订阅 24h 滚动 Ticker 流。
* 每笔成交触发推送(Binance: <symbol>@ticker)。
*/
subscribeTicker(symbols: string[]): Observable<Ticker>;
/**
* 订阅逐笔成交流。
* 实时推送每笔撮合成交(Binance: <symbol>@trade)。
*/
subscribeTrade(symbols: string[]): Observable<Trade>;
/**
* 订阅订单簿深度。
* depth 参数指定档位(如 5/10/20),默认 20。
*/
subscribeOrderbook(symbol: string, depth?: number): Observable<OrderBook>;
/**
* REST 拉取历史 K 线(用于补齐缺失数据或回测)。
*
* @param symbol - 交易对符号
* @param interval - K 线周期
* @param startTime - 起始时间(Unix ms
* @param endTime - 结束时间(Unix ms
* @param limit - 最大返回条数(默认 500)
* @returns 标准化 K 线数组,按时间升序
*/
fetchKlines(
symbol: string,
interval: KlineInterval,
startTime: number,
endTime: number,
limit?: number,
): Promise<Kline[]>;
/**
* 获取交易所交易对信息(用于自动注册到 trading_pairs 表)。
* 返回标准化后的交易对元数据。
*/
fetchMarkets(): Promise<MarketInfo[]>;
}
/** 交易对元信息(从交易所 REST API 获取) */
export interface MarketInfo {
symbol: string;
baseAsset: string;
quoteAsset: string;
pricePrecision: number;
quantityPrecision: number;
minQty?: number;
stepSize?: number;
minNotional?: number;
}
// ============================================================
// 工具类型
// ============================================================
/** Binance WebSocket 原始 K 线数据(kline 事件中的 k 字段) */
export interface BinanceRawKline {
t: number; // K 线开始时间
T: number; // K 线结束时间
s: string; // 交易对
i: string; // 周期
o: string; // 开盘价
h: string; // 最高价
l: string; // 最低价
c: string; // 收盘价
v: string; // 成交量
n: number; // 成交笔数
x: boolean; // 是否已关闭
q: string; // 成交额
V: string; // 主动买入成交量
Q: string; // 主动买入成交额
}