feat(ws): kline:tick 微回补机制消除回补→WS衔接缺口 (#7)

- bus.ts: 新增 kline:tick 事件类型
- ws.ts: toKline() 提取;final=false 时 emit kline:tick
- start.ts: 监听 kline:tick,首次 tick 触发 backfillKline 微回补
- backfill.ts: 回溯10分钟注释说明
This commit is contained in:
Rekey
2026-06-18 23:14:13 +08:00
parent 2c7d993049
commit 6374159e53
4 changed files with 46 additions and 8 deletions
+13 -6
View File
@@ -9,7 +9,9 @@ import type { Kline, PairType } from "../../types";
// ============================================================
// 继承 BaseWsClient,封装 Binance WebsocketClient。
// 每个 exchange:type 组合对应一个实例,内部管理一个 WS 连接。
// 收到闭合 K 线后标准化为 Kline → bus.emit("kline:update")。
// 收到 K 线后标准化为 Kline → bus.emit
// 未闭合(final=false)→ "kline:tick"(衔接窗口检测 #7
// 已闭合(final=true → "kline:update"
// ============================================================
type WsKey = "main" | "usdm";
@@ -62,11 +64,8 @@ export class BinanceWsClient extends BaseWsClient {
this.symbols.delete(symbol);
}
private handleKline(msg: WsMessageKlineFormatted): void {
if (!msg.kline.final) {
return;
};
const kline: Kline = {
private toKline(msg: WsMessageKlineFormatted): Kline {
return {
exchange: this.exchange,
symbol: msg.symbol,
type: this.type,
@@ -84,6 +83,14 @@ export class BinanceWsClient extends BaseWsClient {
tradeCount: String(msg.kline.trades),
isClosed: msg.kline.final,
};
}
private handleKline(msg: WsMessageKlineFormatted): void {
const kline = this.toKline(msg);
if (!msg.kline.final) {
bus.emit("kline:tick", kline);
return;
}
bus.emit("kline:update", kline);
}
}
+28 -2
View File
@@ -1,7 +1,7 @@
import { logger, bus } from "../utils";
import { backfillKlines } from "../service/backfill";
import { backfillKline, backfillKlines } from "../service/backfill";
import { upsertOrUpdateKlines } from "../service/kline";
import { updatePairWsTime } from "../service/pair";
import { updatePairWsTime, getPairBySymbol } from "../service/pair";
import { checkAndRefresh } from "../service/aggregate";
import { watchKline } from "../exchanges";
import { AppDataSource } from "../db/data-source";
@@ -82,6 +82,32 @@ bus.on("kline:update", (kline) => {
});
});
// kline:tick → 首次触发微回补,消除回补→WS 衔接缺口(#7)
const tickedSymbols = new Set<string>();
bus.on("kline:tick", async (kline) => {
const key = `${kline.type}:${kline.symbol}`;
if (tickedSymbols.has(key)) {
return;
}
tickedSymbols.add(key);
try {
const pair = await getPairBySymbol(kline.symbol, kline.type);
if (!pair) {
return;
}
// 去重:仅首次 tick 触发回补,后续 tick 忽略(常规 kline:update 流已覆盖)
// backfillKline 内部已含 10 分钟回溯重叠,无需额外处理
await backfillKline(pair);
} catch (err) {
logger.error({ err, symbol: kline.symbol }, "微回补失败");
// 非致命:WS 闭合 K 线到来后仍会正常入库
}
});
// kline:saved → 聚合刷新 → emit aggregate:refreshed
bus.on("kline:saved", (kline) => {
checkAndRefresh(kline.openTime)
+2
View File
@@ -49,6 +49,8 @@ export async function backfillKline(pair: TradingPair): Promise<IBackfillKlineRe
exchange: pair.exchange.name,
type: pair.type,
symbol: pair.symbol,
// 向前回溯 10 分钟重叠拉取,避免因时间对齐偏差导致 K 线缺口;
// lastBackfillTime === 0 表示首次回补,不做偏移。
startTime: lastBackfillTime === 0 ? lastBackfillTime : lastBackfillTime - 10 * 60 * 1000,
limit: 1000,
});
+3
View File
@@ -25,6 +25,9 @@ export type BusEvents = {
/** WebSocket 推送的已闭合 K 线(exchanges/ws 标准化后 emit */
"kline:update": [Kline];
/** WebSocket 推送的未闭合 1m K 线(衔接窗口检测,首次触发微回补 #7) */
"kline:tick": [Kline];
/** K 线入库 + pair 时间标记完成后 emit(聚合刷新等依赖入库的消费者监此事件) */
"kline:saved": [Kline];