fix: 聚合刷新改为按 pair 计数精确触发(方案 C)
问题:同一分钟多个 pair 的 1m K 线先后入库,旧代码每个 pair 各自触发 checkAndRefresh,先到达的 pair 刷新时其他 pair 数据 未入库,导致聚合视图不完整。 方案:用 Map<openTime, Set<pairKey>> 收集同分钟所有 pair, 全部到齐后统一触发一次 checkAndRefresh。 - start.ts: kline:saved 改为收集模式,readyCount 到齐才触发 - start.ts: backfillComplete 门控避免启动期 readyCount 不稳定 - start.ts: pairKey = exchange:type:symbol,与 pair:ready 对齐 - aggregate.ts: 删除 refreshLocks 去重锁,编排层已保证单次调用
This commit is contained in:
+67
-13
@@ -23,6 +23,7 @@ import { AppDataSource } from "../db/data-source";
|
||||
|
||||
let readyCount = 0;
|
||||
let errorCount = 0;
|
||||
let backfillComplete = false; // 回补全部完成后才允许聚合收集(避免启动期 readyCount 尚未达到终值)
|
||||
|
||||
// ---- 断线重连检测 ----
|
||||
const STALE_TIMEOUT_MS = 5 * 60 * 1000; // 5 分钟观察窗口
|
||||
@@ -62,7 +63,9 @@ bus.on("backfill:complete", ({ status }) => {
|
||||
if (status === "error") {
|
||||
logger.error("回补未全部成功,退出进程等待外部重启");
|
||||
AppDataSource.destroy().finally(() => process.exit(1));
|
||||
return;
|
||||
}
|
||||
backfillComplete = true;
|
||||
});
|
||||
|
||||
// kline:update → 入库 → 更新 pair 时间 → emit kline:saved
|
||||
@@ -108,21 +111,72 @@ bus.on("kline:tick", async (kline) => {
|
||||
}
|
||||
});
|
||||
|
||||
// kline:saved → 聚合刷新 → emit aggregate:refreshed
|
||||
// ============================================================
|
||||
// kline:saved → 聚合刷新
|
||||
// ============================================================
|
||||
//
|
||||
// 背景:同一分钟会有多个 pair(BTCUSDT、ETHUSDT 等)的 1m K 线先后入库。
|
||||
// 若每个 pair 到达立刻触发聚合刷新,则先到达的 pair 刷新时其他 pair 的
|
||||
// K 线尚未入库,导致该分钟聚合视图数据不完整。
|
||||
//
|
||||
// 方案:收集同一 openTime 下所有 pair 的 K 线,等全部到齐后统一触发一次
|
||||
// checkAndRefresh。用 Map<openTime, Set<pairKey>> 收集已入库的 pair,
|
||||
// Set.size 达到 readyCount(所有 pair:ready 计数)时触发。
|
||||
//
|
||||
// 门控:backfill:complete 之前 kline:saved 直接 return,避免启动期
|
||||
// readyCount 尚未稳定(部分 pair 已订阅 WS、部分还在回补)就触发收集。
|
||||
//
|
||||
// 不设兜底超时:若某 pair 掉线导致某个 openTime 永远达不到 readyCount,
|
||||
// 该 entry 留在 symbolCollector 中。量极小(每对 pair 同一分钟最多一个
|
||||
// openTime 未到齐),且 pair 恢复后下一分钟自然恢复正常。
|
||||
//
|
||||
// pairKey = `${exchange}:${type}:${symbol}`,三者组合唯一标识一个 pair,
|
||||
// 与 pair:ready 的计数粒度对齐。Set 自带去重:若同一 pair 的同一分钟
|
||||
// K 线被重复 emit,不会多计数。
|
||||
//
|
||||
// checkAndRefresh 签名不变,无内部锁,由编排层保证单次调用。
|
||||
//
|
||||
// aggregate:refreshed 同一 openTime 仅 emit 一次,携带最后到齐的 kline。
|
||||
// 后续由 Redis Pub/Sub 通知策略引擎消费聚合数据。
|
||||
// ============================================================
|
||||
const symbolCollector = new Map<number, Set<string>>();
|
||||
|
||||
// TODO: 当前 WS 延迟到达的 pair 可能在集合已 delete 后创建新 Set 成为孤儿 entry。
|
||||
// 未来可通过封装 getOrCreate/openTime 生命周期方法来确保同一 openTime
|
||||
// 只保留一个 Set,现阶段量极小收益不明显,暂不处理。
|
||||
|
||||
bus.on("kline:saved", (kline) => {
|
||||
checkAndRefresh(kline.openTime)
|
||||
.then(() => {
|
||||
bus.emit("aggregate:refreshed", kline);
|
||||
})
|
||||
// TODO: 后续接入 Telegram 等通知渠道,由 system:error 事件驱动自动告警
|
||||
.catch((err) => {
|
||||
logger.error({ err, symbol: kline.symbol, openTime: kline.openTime }, "聚合刷新失败");
|
||||
bus.emit("system:error", {
|
||||
source: "kline:saved",
|
||||
error: err,
|
||||
context: { symbol: kline.symbol, openTime: kline.openTime, type: kline.type },
|
||||
});
|
||||
if (!backfillComplete) {
|
||||
return; // 回补未完成,readyCount 尚未稳定,不收集
|
||||
}
|
||||
|
||||
const { openTime, symbol } = kline;
|
||||
|
||||
let symbols = symbolCollector.get(openTime);
|
||||
if (!symbols) {
|
||||
symbols = new Set();
|
||||
symbolCollector.set(openTime, symbols);
|
||||
}
|
||||
|
||||
const pairKey = `${kline.exchange}:${kline.type}:${symbol}`;
|
||||
symbols.add(pairKey);
|
||||
|
||||
if (symbols.size < readyCount) {
|
||||
return;
|
||||
}
|
||||
|
||||
logger.debug({ openTime, count: symbols.size, readyCount }, "symbol 到齐,触发聚合刷新");
|
||||
symbolCollector.delete(openTime);
|
||||
checkAndRefresh(openTime).then(() => {
|
||||
bus.emit("aggregate:refreshed", kline);
|
||||
}).catch((err) => {
|
||||
logger.error({ err, openTime, symbol: kline.symbol, type: kline.type }, "聚合刷新失败");
|
||||
bus.emit("system:error", {
|
||||
source: "kline:saved",
|
||||
error: err,
|
||||
context: { openTime, symbol: kline.symbol, type: kline.type },
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
// ---- 断线重连检测 ----
|
||||
|
||||
@@ -4,6 +4,11 @@
|
||||
// 提供 checkAndRefresh(openTime) 纯函数,不主动监听任何事件。
|
||||
// 由 run/start.ts 编排层在收到 "kline:saved" 时调用。
|
||||
//
|
||||
// 无并发锁:
|
||||
// 最小聚合周期为 3m,同一视图两次触发至少间隔 3 分钟。
|
||||
// 若 3m 聚合在 3 分钟内完不成,瓶颈在 DB 而非业务层——
|
||||
// 加锁只是掩盖问题,应排查数据库而非靠锁续命。
|
||||
//
|
||||
// 聚合视图依赖链(03-continuous-aggregates.sql):
|
||||
// klines (1m 基表)
|
||||
// ├── klines_3m ← 直接聚合 1m
|
||||
|
||||
Reference in New Issue
Block a user