fix(backfill): 修复回补超时竞态 + 首批单元测试 + 技术债务文档

- backfill.ts: clearTimeout 移入 try/catch 分支内部,防止 upsert 慢被超时误杀
- backfill.ts: setTimeout 回调和 catch 块各加 status 先到先得守卫
- backfill.ts: 注释补充容错策略(Binance SDK 僵死 → exit 重启)
- tests: 新增 4 文件 63 用例(utils/bus/convert-kline/config)
- docs: tech-debt.md 技术债务清单, testing.md 测试方案
This commit is contained in:
Rekey
2026-06-18 15:34:02 +08:00
parent ff4c66aee9
commit 0e449d3c5f
7 changed files with 882 additions and 10 deletions
+29 -10
View File
@@ -12,11 +12,18 @@ interface IBackfillKlineResult {
}
/**
* 回补单个交易对的 K 线历史数据
* 从上次回补时间开始逐批拉取,直到追平上一分钟(已闭合的 K 线)
* 每批最多 1000 条,批次间随机延迟 0~1s 防止触发交易所频率限制
* 单次请求设有 10s 超时,超时或网络错误标记为 'error'
* @param pair - 需要回补的交易对
* 回补单个交易对的 K 线历史数据
*
* 从 last_backfill_time 开始逐批拉取,直到追平上一分钟(已闭合的 K 线)。
* 每批最多 1000 条,批次间随机延迟 0~1s 防止触发交易所频率限制。
* 单次请求 10s 超时。
*
* **容错策略**
* 超时或任何异常均标记为 `error`,交由上层退出进程,由外部进程管理器
*systemd / pm2 / Docker)重启恢复。理由是 Binance SDK 存在已知僵死问题
*(无响应、不超时、不报错),此场景下仅表现为空数组;任何非正常路径都
* 通过退出重启来 reset,误杀可接受——重启后回补从 last_backfill_time 自动续接。
*
* @returns status: 'done' 表示成功追平,'error' 表示过程中出错
*/
export async function backfillKline(pair: TradingPair): Promise<IBackfillKlineResult> {
@@ -30,8 +37,11 @@ export async function backfillKline(pair: TradingPair): Promise<IBackfillKlineRe
break;
}
const timer = setTimeout(() => {
result.status = 'error';
result.error = new Error('超时');
// 先到先得:status 已被 catch 修改则忽略
if (result.status === 'done') {
result.status = 'error';
result.error = new Error('超时');
}
}, 10000);
try {
logger.info({ lastBackfillTime }, '回补进度');
@@ -42,6 +52,10 @@ export async function backfillKline(pair: TradingPair): Promise<IBackfillKlineRe
startTime: lastBackfillTime,
limit: 1000,
});
// fetchKlines 已返回,立即解除超时。
// 必须在 try 内清除:result.status 默认就是 'done'
// 若等 upsert 完成后再 clear,期间超时定时器会误判为失败。
clearTimeout(timer);
logger.info(`拉取到 ${klines.length} 条 K 线`);
if (klines.length > 0) {
await upsertOrUpdateKlines(klines);
@@ -55,11 +69,16 @@ export async function backfillKline(pair: TradingPair): Promise<IBackfillKlineRe
}
}
} catch (err) {
result.status = 'error';
result.error = err as Error;
// 先到先得:status 已被超时定时器修改则忽略
if (result.status === 'done') {
result.status = 'error';
result.error = err as Error;
}
logger.error({ err }, "拉取失败");
// 请求已失败,定时器无继续存在的意义。
// 同样必须在 catch 内清除,避免悬空的定时器回调修改 status。
clearTimeout(timer);
}
clearTimeout(timer);
await new Promise((resolve) => {
setTimeout(resolve, Math.random() * 1000);
});
+120
View File
@@ -0,0 +1,120 @@
import { describe, it, expect, beforeEach, vi } from "vitest";
import { EventEmitter } from "node:events";
import { bus } from "../utils/bus";
// ============================================================
// bus.test.ts — TypedEventBus 事件系统单元测试
// ============================================================
// bus 是单例,每个测试前清空所有监听器保证隔离。
beforeEach(() => {
(bus as unknown as EventEmitter).removeAllListeners();
});
describe("TypedEventBus — on + emit", () => {
it("注册监听后 emit 触发回调", () => {
const fn = vi.fn();
bus.on("ws:connected", fn);
bus.emit("ws:connected");
expect(fn).toHaveBeenCalledTimes(1);
});
it("emit 多次触发多次", () => {
const fn = vi.fn();
bus.on("ws:connected", fn);
bus.emit("ws:connected");
bus.emit("ws:connected");
bus.emit("ws:connected");
expect(fn).toHaveBeenCalledTimes(3);
});
it("同一事件多个监听器全部触发", () => {
const fn1 = vi.fn();
const fn2 = vi.fn();
const fn3 = vi.fn();
bus.on("ws:disconnected", fn1);
bus.on("ws:disconnected", fn2);
bus.on("ws:disconnected", fn3);
bus.emit("ws:disconnected");
expect(fn1).toHaveBeenCalledTimes(1);
expect(fn2).toHaveBeenCalledTimes(1);
expect(fn3).toHaveBeenCalledTimes(1);
});
it("不同事件互不干扰", () => {
const connectedFn = vi.fn();
const disconnectedFn = vi.fn();
bus.on("ws:connected", connectedFn);
bus.on("ws:disconnected", disconnectedFn);
bus.emit("ws:connected");
expect(connectedFn).toHaveBeenCalledTimes(1);
expect(disconnectedFn).not.toHaveBeenCalled();
});
it("空事件(无载荷)回调参数为空数组", () => {
const fn = vi.fn();
bus.on("ws:stale", fn);
bus.emit("ws:stale");
expect(fn).toHaveBeenCalledWith();
});
it("有载荷事件回调收到完整参数", () => {
const fn = vi.fn();
bus.on("system:error", fn);
const payload = { source: "test", error: new Error("boom"), context: { key: "val" } };
bus.emit("system:error", payload);
expect(fn).toHaveBeenCalledWith(payload);
expect(fn.mock.calls[0]?.[0]).toHaveProperty("source", "test");
expect(fn.mock.calls[0]?.[0]).toHaveProperty("context.key", "val");
});
it("emit 无监听器不抛异常", () => {
expect(() => bus.emit("ws:stale")).not.toThrow();
});
});
describe("TypedEventBus — once", () => {
it("once 只触发一次", () => {
const fn = vi.fn();
bus.once("ws:connected", fn);
bus.emit("ws:connected");
bus.emit("ws:connected");
expect(fn).toHaveBeenCalledTimes(1);
});
it("once 和 on 混用", () => {
const onceFn = vi.fn();
const onFn = vi.fn();
bus.once("ws:connected", onceFn);
bus.on("ws:connected", onFn);
bus.emit("ws:connected");
bus.emit("ws:connected");
expect(onceFn).toHaveBeenCalledTimes(1);
expect(onFn).toHaveBeenCalledTimes(2);
});
});
describe("TypedEventBus — off", () => {
it("off 移除后不再触发", () => {
const fn = vi.fn();
bus.on("ws:connected", fn);
bus.off("ws:connected", fn);
bus.emit("ws:connected");
expect(fn).not.toHaveBeenCalled();
});
it("off 只移除指定 listener,不影响其他", () => {
const fn1 = vi.fn();
const fn2 = vi.fn();
bus.on("ws:connected", fn1);
bus.on("ws:connected", fn2);
bus.off("ws:connected", fn1);
bus.emit("ws:connected");
expect(fn1).not.toHaveBeenCalled();
expect(fn2).toHaveBeenCalledTimes(1);
});
it("off 未注册的 listener 不抛异常", () => {
expect(() => bus.off("ws:connected", vi.fn())).not.toThrow();
});
});
+217
View File
@@ -0,0 +1,217 @@
import { describe, it, expect } from "vitest";
import { validateConfig } from "../config/validators";
// ============================================================
// config.test.ts — 配置校验单元测试
// ============================================================
// 被测函数:validateConfig(raw: unknown) → EnvConfig
// 内部 assertString / assertPort / assertBoolean / assertEnum 也通过此函数覆盖。
function makeValidConfig() {
return {
db: {
host: "localhost",
port: 5432,
name: "trade",
user: "admin",
password: "secret",
},
redis: {
url: "redis://localhost:6379",
publish_enabled: false,
},
exchange: {
binance: {
api_key: "test-api-key",
api_secret: "test-api-secret",
},
binance_futures: {
api_key: "test-futures-key",
api_secret: "test-futures-secret",
},
},
logging: {
level: "info",
node_env: "test",
},
};
}
// ============================================================
describe("validateConfig — 合法配置", () => {
it("完整合法配置返回类型安全对象", () => {
const config = validateConfig(makeValidConfig());
expect(config.db.host).toBe("localhost");
expect(config.db.port).toBe(5432);
expect(config.db.name).toBe("trade");
expect(config.db.user).toBe("admin");
expect(config.db.password).toBe("secret");
expect(config.redis.url).toBe("redis://localhost:6379");
expect(config.redis.publish_enabled).toBe(false);
expect(config.exchange.binance.api_key).toBe("test-api-key");
expect(config.exchange.binance.api_secret).toBe("test-api-secret");
expect(config.exchange.binance_futures.api_key).toBe("test-futures-key");
expect(config.exchange.binance_futures.api_secret).toBe("test-futures-secret");
expect(config.logging.level).toBe("info");
expect(config.logging.node_env).toBe("test");
});
it("合法日志级别全部通过", () => {
const levels = ["trace", "debug", "info", "warn", "error", "fatal"] as const;
for (const level of levels) {
const raw = makeValidConfig();
raw.logging.level = level;
expect(validateConfig(raw).logging.level).toBe(level);
}
});
it("合法 node_env 全部通过", () => {
for (const env of ["development", "production", "test"] as const) {
const raw = makeValidConfig();
raw.logging.node_env = env;
expect(validateConfig(raw).logging.node_env).toBe(env);
}
});
});
// ============================================================
describe("validateConfig — 顶层结构校验", () => {
it("null 输入抛错", () => {
expect(() => validateConfig(null)).toThrow("顶层必须为 object");
});
it("非 object 输入抛错", () => {
expect(() => validateConfig("string")).toThrow();
expect(() => validateConfig(123)).toThrow();
});
it("缺失 db 抛错", () => {
const { db, ...rest } = makeValidConfig();
expect(() => validateConfig(rest)).toThrow("缺少 db");
});
it("缺失 redis 抛错", () => {
const { redis, ...rest } = makeValidConfig();
expect(() => validateConfig(rest)).toThrow("缺少 redis");
});
it("缺失 exchange 抛错", () => {
const { exchange, ...rest } = makeValidConfig();
expect(() => validateConfig(rest)).toThrow("缺少 exchange");
});
it("缺失 logging 抛错", () => {
const { logging, ...rest } = makeValidConfig();
expect(() => validateConfig(rest)).toThrow("缺少 logging");
});
});
// ============================================================
describe("validateConfig — db 字段校验", () => {
it("db.host 空字符串抛错", () => {
const raw = makeValidConfig();
raw.db.host = "";
expect(() => validateConfig(raw)).toThrow("必须为非空字符串");
});
it("db.host 为 number 抛错", () => {
const raw = makeValidConfig();
raw.db.host = 123 as unknown as string;
expect(() => validateConfig(raw)).toThrow();
});
it("db.port 为 0 抛错", () => {
const raw = makeValidConfig();
raw.db.port = 0;
expect(() => validateConfig(raw)).toThrow("有效端口号");
});
it("db.port 为 65536 抛错", () => {
const raw = makeValidConfig();
raw.db.port = 65536;
expect(() => validateConfig(raw)).toThrow("有效端口号");
});
it("db.port 为 'abc' 抛错", () => {
const raw = makeValidConfig();
raw.db.port = "abc" as unknown as number;
expect(() => validateConfig(raw)).toThrow("有效端口号");
});
it("db.port 为合法字符串 '5432' 转为 number 5432", () => {
const raw = makeValidConfig();
raw.db.port = "5432" as unknown as number;
const config = validateConfig(raw);
expect(config.db.port).toBe(5432);
expect(typeof config.db.port).toBe("number");
});
});
// ============================================================
describe("validateConfig — logging 字段校验", () => {
it("logging.level 非法值抛错", () => {
const raw = makeValidConfig();
raw.logging.level = "verbose" as "info";
expect(() => validateConfig(raw)).toThrow("必须为 trace | debug | info | warn | error | fatal 之一");
});
it("logging.node_env 非法值抛错", () => {
const raw = makeValidConfig();
raw.logging.node_env = "staging" as "test";
expect(() => validateConfig(raw)).toThrow();
});
});
// ============================================================
describe("validateConfig — redis 字段校验", () => {
it("redis.publish_enabled 字符串 'true' 转为 boolean true", () => {
const raw = makeValidConfig();
(raw.redis as Record<string, unknown>).publish_enabled = "true";
expect(validateConfig(raw).redis.publish_enabled).toBe(true);
});
it("redis.publish_enabled 字符串 'false' 转为 boolean false", () => {
const raw = makeValidConfig();
(raw.redis as Record<string, unknown>).publish_enabled = "false";
expect(validateConfig(raw).redis.publish_enabled).toBe(false);
});
it("redis.publish_enabled 不可转换值抛错", () => {
const raw = makeValidConfig();
(raw.redis as Record<string, unknown>).publish_enabled = 123;
expect(() => validateConfig(raw)).toThrow("必须为 boolean");
});
});
// ============================================================
describe("validateConfig — exchange 字段校验", () => {
it("缺失 binance 抛错", () => {
const raw = makeValidConfig();
delete (raw.exchange as Record<string, unknown>)["binance"];
expect(() => validateConfig(raw)).toThrow("缺少 binance");
});
it("缺失 binance_futures 抛错", () => {
const raw = makeValidConfig();
delete (raw.exchange as Record<string, unknown>)["binance_futures"];
expect(() => validateConfig(raw)).toThrow("缺少 binance_futures");
});
it("binance.api_key 空字符串抛错", () => {
const raw = makeValidConfig();
raw.exchange.binance.api_key = "";
expect(() => validateConfig(raw)).toThrow("必须为非空字符串");
});
it("binance.api_secret 空字符串抛错", () => {
const raw = makeValidConfig();
raw.exchange.binance.api_secret = "";
expect(() => validateConfig(raw)).toThrow("必须为非空字符串");
});
});
+215
View File
@@ -0,0 +1,215 @@
import { describe, it, expect } from "vitest";
import { convertBinanceKline } from "../exchanges/binance/rest";
import type { Kline, KlineInterval, PairType } from "../types";
// ============================================================
// convert-kline.test.ts — Binance K 线数据转换单元测试
// ============================================================
// 被测函数:convertBinanceKline(raw, symbol, interval, type) → Kline
//
// Binance SDK Kline 类型为 12 元组(Kline 类型来自 binance 包):
// [0] openTime: number
// [1] open: number | string
// [2] high: number | string
// [3] low: number | string
// [4] close: number | string
// [5] volume: number | string
// [6] closeTime: number
// [7] quoteVolume: number | string
// [8] tradeCount: number
// [9] takerBuyBaseVol: number | string
// [10] takerBuyQuoteVol: number | string
// [11] ignore: number | string
//
// 注:raw 用 as 断言绕过 binance SDK 的完整 Kline 类型,
// 因为我们只需 12 元组子集就足够测试转换逻辑。
type RawTuple = [
number, // 0: openTime
number | string, // 1: open
number | string, // 2: high
number | string, // 3: low
number | string, // 4: close
number | string, // 5: volume
number, // 6: closeTime
number | string, // 7: quoteVolume
number, // 8: tradeCount
number | string, // 9: takerBuyBaseVol
number | string, // 10: takerBuyQuoteVol
number | string, // 11: ignore
];
function raw(values: RawTuple) {
return values as unknown as Parameters<typeof convertBinanceKline>[0];
}
const SYMBOL = "BTCUSDT";
const INTERVAL: KlineInterval = "1m";
const TYPE: PairType = "spot";
function makeBaseRaw(): RawTuple {
return [
1700000000000, // openTime
"42000.50", // open
"42100.00", // high
"41900.00", // low
"42050.75", // close
"123.456", // volume
1700000059999, // closeTime
"5185000.00", // quoteVolume
5000, // tradeCount
"60.123", // takerBuyBaseVol
"2520000.00", // takerBuyQuoteVol
"0", // ignore
];
}
// ============================================================
describe("convertBinanceKline — 基本字段映射", () => {
it("exchange 固定为 binance", () => {
const k = convertBinanceKline(raw(makeBaseRaw()), SYMBOL, INTERVAL, TYPE);
expect(k.exchange).toBe("binance");
});
it("symbol 透传", () => {
const k = convertBinanceKline(raw(makeBaseRaw()), "ETHUSDT", INTERVAL, TYPE);
expect(k.symbol).toBe("ETHUSDT");
});
it("interval 透传", () => {
const k = convertBinanceKline(raw(makeBaseRaw()), SYMBOL, "1m", TYPE);
expect(k.interval).toBe("1m");
});
it("type 透传 spot", () => {
const k = convertBinanceKline(raw(makeBaseRaw()), SYMBOL, INTERVAL, "spot");
expect(k.type).toBe("spot");
});
it("type 透传 um", () => {
const k = convertBinanceKline(raw(makeBaseRaw()), SYMBOL, INTERVAL, "um");
expect(k.type).toBe("um");
});
it("openTime / closeTime 为 number,值原样保留", () => {
const k = convertBinanceKline(raw(makeBaseRaw()), SYMBOL, INTERVAL, TYPE);
expect(k.openTime).toBe(1700000000000);
expect(k.closeTime).toBe(1700000059999);
expect(typeof k.openTime).toBe("number");
expect(typeof k.closeTime).toBe("number");
});
it("isClosed 始终为 trueREST 返回已闭合 K 线)", () => {
const k = convertBinanceKline(raw(makeBaseRaw()), SYMBOL, INTERVAL, TYPE);
expect(k.isClosed).toBe(true);
});
});
describe("convertBinanceKline — 价格字段 String() 转换", () => {
it("价格字段为 string 时原样保留", () => {
const k = convertBinanceKline(raw(makeBaseRaw()), SYMBOL, INTERVAL, TYPE);
expect(k.open).toBe("42000.50");
expect(k.high).toBe("42100.00");
expect(k.low).toBe("41900.00");
expect(k.close).toBe("42050.75");
});
it("价格字段为 number 时转为 string", () => {
const r: RawTuple = [
1700000000000,
42000, // open: number
42100, // high: number
41900, // low: number
42050, // close: number
123, // volume: number
1700000059999,
5185000, // quoteVolume: number
5000,
60, // takerBuyBaseVol: number
2520000, // takerBuyQuoteVol: number
"0",
];
const k = convertBinanceKline(raw(r), SYMBOL, INTERVAL, TYPE);
expect(k.open).toBe("42000");
expect(k.high).toBe("42100");
expect(k.low).toBe("41900");
expect(k.close).toBe("42050");
expect(typeof k.open).toBe("string");
});
it("价格字段为 0 时转为 '0'", () => {
const r: RawTuple = [
1700000000000, 0, 0, 0, 0, 0, 1700000059999, 0, 0, 0, 0, "0",
];
const k = convertBinanceKline(raw(r), SYMBOL, INTERVAL, TYPE);
expect(k.open).toBe("0");
expect(k.volume).toBe("0");
});
});
describe("convertBinanceKline — 扩展字段", () => {
it("volume 转换", () => {
const k = convertBinanceKline(raw(makeBaseRaw()), SYMBOL, INTERVAL, TYPE);
expect(k.volume).toBe("123.456");
});
it("quoteVolume 转换", () => {
const k = convertBinanceKline(raw(makeBaseRaw()), SYMBOL, INTERVAL, TYPE);
expect(k.quoteVolume).toBe("5185000.00");
});
it("tradeCount 转为 string", () => {
const k = convertBinanceKline(raw(makeBaseRaw()), SYMBOL, INTERVAL, TYPE);
expect(k.tradeCount).toBe("5000");
expect(typeof k.tradeCount).toBe("string");
});
it("takerBuyBaseVol 转换", () => {
const k = convertBinanceKline(raw(makeBaseRaw()), SYMBOL, INTERVAL, TYPE);
expect(k.takerBuyBaseVol).toBe("60.123");
});
it("takerBuyQuoteVol 转换", () => {
const k = convertBinanceKline(raw(makeBaseRaw()), SYMBOL, INTERVAL, TYPE);
expect(k.takerBuyQuoteVol).toBe("2520000.00");
});
});
describe("convertBinanceKline — 边界与特殊值", () => {
it("openTime 为 0", () => {
const r: RawTuple = [
0, "1", "1", "1", "1", "1", 59000, "1", 1, "1", "1", "0",
];
const k = convertBinanceKline(raw(r), SYMBOL, INTERVAL, TYPE);
expect(k.openTime).toBe(0);
});
it("大精度价格保留", () => {
const r: RawTuple = [
1700000000000,
"999999999.99999999",
"999999999.99999999",
"0.00000001",
"50000.12345678",
"0.00001234",
1700000059999,
"0.00000001",
1,
"0",
"0",
"0",
];
const k = convertBinanceKline(raw(r), SYMBOL, INTERVAL, TYPE);
expect(k.open).toBe("999999999.99999999");
expect(k.low).toBe("0.00000001");
expect(k.close).toBe("50000.12345678");
});
it("第 12 位 ignore 字段不出现在结果中", () => {
const k = convertBinanceKline(raw(makeBaseRaw()), SYMBOL, INTERVAL, TYPE);
// Kline 接口不包含 ignore 字段,编译期已保证不泄露;
// 运行时确认返回对象中确实不存在名为 ignore 的属性
expect(Object.keys(k)).not.toContain("ignore");
});
});
+67
View File
@@ -0,0 +1,67 @@
import { describe, it, expect } from "vitest";
import { getNowMinuteMS, getPrevMinuteMS, wait } from "../utils";
// ============================================================
// utils.test.ts — 工具函数单元测试
// ============================================================
describe("getNowMinuteMS", () => {
it("返回的时间戳的秒和毫秒均为 0", () => {
const result = getNowMinuteMS();
const d = new Date(result);
expect(d.getSeconds()).toBe(0);
expect(d.getMilliseconds()).toBe(0);
});
it("不超过当前时间(分钟边界 ≤ now)", () => {
const now = Date.now();
const result = getNowMinuteMS();
expect(result).toBeLessThanOrEqual(now);
});
it("结果距当前时间不足 60 秒(最近过去的分钟边界)", () => {
const now = Date.now();
const result = getNowMinuteMS();
expect(now - result).toBeLessThan(60000);
});
it("结果可被 60000 整除", () => {
const result = getNowMinuteMS();
expect(result % 60000).toBe(0);
});
});
describe("getPrevMinuteMS", () => {
it("精确等于 getNowMinuteMS() - 60000", () => {
// 在同一毫秒内连续调用,避免跨分钟边界
const now = getNowMinuteMS();
const prev = getPrevMinuteMS();
expect(prev).toBe(now - 60000);
});
it("秒和毫秒均为 0", () => {
const d = new Date(getPrevMinuteMS());
expect(d.getSeconds()).toBe(0);
expect(d.getMilliseconds()).toBe(0);
});
});
describe("wait", () => {
it("等待约 100ms 后 resolve(耗时 ≥ 80ms", async () => {
const start = Date.now();
await wait(100);
expect(Date.now() - start).toBeGreaterThanOrEqual(80);
});
it("等待约 100ms 后 resolve(耗时 ≤ 200ms,宽松边界)", async () => {
const start = Date.now();
await wait(100);
expect(Date.now() - start).toBeLessThan(200);
});
it("wait(0) 立即 resolve(耗时 < 50ms", async () => {
const start = Date.now();
await wait(0);
expect(Date.now() - start).toBeLessThan(50);
});
});
+83
View File
@@ -0,0 +1,83 @@
# 技术债务清单
> data/ 模块工程质量评估产物,按优先级排列。修复后勾选。
## 🔴 P0 — 立即修复
- [x] **1. API 密钥明文存储**
- 位置:`data/env.yaml:28-29`
- 问题:真实 Binance API Key/Secret 明文写在 YAML 中,已提交 git
- 决策:**不修复**。理由:① 私有仓库,无泄露风险;② env.yaml 已是唯一配置源,key 集中化管理;③ 生产部署(Docker 等)通过外部 volume 挂载覆盖;④ 该 key 仅读取权限,账户无余额。当前方案满足现阶段安全需求
- [x] **2. 回补超时竞态**
- 位置:`data/service/backfill.ts:32-34`
- 问题:`setTimeout` 回调与 async try 块共写同一个 `result` 对象,存在竞态
- 修复:超时回调和 catch 块各加 `if (result.status === 'done')` 守卫,"先到先得"。同时补充注释说明"任何错误都退出重启"的设计决策(Binance SDK 僵死问题)
- [x] **3. 零测试**
- 位置:全局(无 `.test.ts` 文件)
- 问题:`vitest` 依赖和 scripts 已配置但从未使用,核心模块覆盖率 0%
- 方案:详见 [docs/testing.md](testing.md),首批 L1+L2 共 4 文件 63 用例已全部通过
## 🟡 P1 — 尽快修复
- [ ] **4. WS 客户端无 `error` 事件监听**
- 位置:`data/exchanges/binance/ws.ts:41-42`
- 问题:仅监听 `open` / `close`,未监听 `error`。SDK 内部异常(如解析失败、协议错误)静默丢失
- 建议:添加 `this.ws.on("error", ...)` 并 emit `system:error`
- [ ] **5. REST 请求无显式 HTTP 错误处理**
- 位置:`data/exchanges/binance/rest.ts:109-115`
- 问题:依赖 Binance SDK 内部抛出,网络错误、HTTP 错误、空结果三者无法区分,调用方误判
- 建议:包裹 SDK 调用,区分网络错误 / HTTP 错误 / 空结果,分别处理
- [ ] **6. 日志调用位置解析无条件执行**
- 位置:`data/utils/logger.ts:28`
- 问题:每条日志执行 `new Error().stack` 解析调用位置,生产环境高频日志下 CPU 开销 0.1-0.5ms/条
- 建议:仅在 `logging.nodeEnv === "development"` 时执行
- [ ] **7. `exchange.entity.ts` 中 `tradingPairs` 类型错误**
- 位置:`data/db/entities/exchange.entity.ts:40`
- 问题:`tradingPairs!: unknown[]` 应为 `TradingPair[]``unknown[]` 丢失类型信息
- 建议:改为 `TradingPair[]` 并确保 import 正确
## 🟢 P2 — 计划改进
- [ ] **8. ARCHITECTURE.md 引用不存在的文件/模块**
- 位置:`data/ARCHITECTURE.md` 多处
- 问题:`run/main.ts``pipeline/``publisher/``utils/retry.ts``exchanges/types.ts``db/entities/klines/` 均不存在。架构图显示 RxJS 管道但代码中未使用
- 建议:要么创建缺失模块,要么更新 ARCHITECTURE.md 反映当前简化架构
- [ ] **9. `exchanges/index.ts` 职责过重**
- 位置:`data/exchanges/index.ts`
- 问题:同时承担 REST client registry7-30 行)和 WS 订阅管理(74-121 行),两类逻辑耦合
- 建议:拆分为 `rest-registry.ts` + `ws-manager.ts`
- [ ] **10. `kline:tick` 微回补未实现**(已知问题 #7
- 位置:`data/exchanges/binance/ws.ts:62-63` + `data/run/start.ts`
- 问题:WS 订阅后的衔接窗口期(回补完成 → subscribe 之间若跨分钟边界)可能导致最多 1 分钟数据缺口。文档方案已明确,待实现
- 建议:`ws.ts``final === false` 时 emit `kline:tick``start.ts` 首次 tick 触发微回补
- [ ] **11. `fetchMarkets()` 返回空数组**
- 位置:`data/exchanges/binance/rest.ts:133,194`
- 问题:TODO 未实现,静默返回 `[]`,调用方无感知
- 建议:实现交易所信息拉取或明确声明暂不支持并抛异常
- [ ] **12. 无批量写入缓冲**
- 位置:`data/run/start.ts:71`
- 问题:每条 `kline:update` 直接 `upsertOrUpdateKlines([kline])`,每个交易对每分钟 1 次 DB 写入。4 个交易对无问题,100+ 交易对可能成为瓶颈
- 建议:参考 ARCHITECTURE.md 的 `KlineWriter` 设计,实现 500 条/1s 批量缓冲
- [ ] **13. `aggregate.ts` 混用 `console.log`**
- 位置:`data/service/aggregate.ts:25`
- 问题:TODO stub 使用 `console.log` 而非 `logger`,日志格式不统一
- 建议:替换为 `logger.debug``logger.info`
- [ ] **14. `test.ts` 位置不当**
- 位置:`data/test.ts`
- 问题:手动 smoke test 放在根目录,与自动化测试混在一起
- 建议:移入 `data/tests/manual/` 或重命名为 `smoke-test.ts`
---
> 评估时间:2026-06-18 | 综合评分 B | 详见评估报告
+151
View File
@@ -0,0 +1,151 @@
# data/ 模块测试方案
## 一、总目标
建立 data/ 模块核心单元测试,从 0% 覆盖率起步,优先覆盖纯函数和高价值逻辑,逐步扩展到需要 mock 的模块。
## 二、测试基础设施
| 项 | 值 |
|----|-----|
| 测试框架 | vitest(已在 `package.json` devDependencies |
| 运行环境 | Bun(与项目运行时一致) |
| 运行命令 | `bun run test` / `bun run test:watch` |
| 文件位置 | `data/tests/` 目录 |
| 文件命名 | `*.test.ts`,与源模块对应 |
## 三、分层策略
```
L1 纯函数 → 无依赖,直接测 → 快速、收益最高
L2 事件系统 → TypedEventBus → 验证事件发布订阅机制
L3 需 mock → DB/service 层 → mock DataSource / fetchKlines
L4 集成测试 → 全链路 → 需 DB + WS,暂不纳入
```
先完成 L1 + L2(首批 4 个测试文件),L3 在 L1/L2 跑通后追加。
## 四、首批测试文件
| 文件 | 被测模块 | 层级 | 用例数(预估) |
|------|----------|:--:|:--------------:|
| `tests/utils.test.ts` | `getNowMinuteMS``getPrevMinuteMS``wait` | L1 | ~8 |
| `tests/bus.test.ts` | `TypedEventBus`on/once/emit/off | L2 | ~10 |
| `tests/convert-kline.test.ts` | `convertBinanceKline` | L1 | ~15 |
| `tests/config.test.ts` | `validateConfig` + `assert*` 函数 | L1 | ~20 |
### 4.1 `tests/utils.test.ts`
**被测函数**`data/utils/index.ts` 中的 `getNowMinuteMS``getPrevMinuteMS``wait`
| # | 用例 | 输入 | 预期 |
|---|------|------|------|
| 1 | 当前分钟起始时间戳的毫秒和秒为 0 | `getNowMinuteMS()` | `new Date(result).getSeconds() === 0``getMilliseconds() === 0` |
| 2 | 返回值为最近过去的分钟边界 | `getNowMinuteMS()` | `result <= Date.now()` |
| 3 | 结果与当前时间的差值小于 60s | `getNowMinuteMS()` | `Date.now() - result < 60000` |
| 4 | prevMinute = nowMinute - 60s | `getPrevMinuteMS()` | `result === getNowMinuteMS() - 60000` |
| 5 | prevMinute 的毫秒和秒也为 0 | — | 同 #1 |
| 6 | wait(100) 至少等 80ms | `await wait(100)` | 耗时 ≥ 80ms |
| 7 | wait(100) 不超过 200ms | `await wait(100)` | 耗时 ≤ 200ms(宽松) |
| 8 | wait(0) 立即 resolve | `await wait(0)` | 耗时 < 50ms |
### 4.2 `tests/bus.test.ts`
**被测模块**`data/utils/bus.ts` 中的 `TypedEventBus``on``once``emit``off`
| # | 用例 | 操作 | 预期 |
|---|------|------|------|
| 1 | 注册监听 → emit → 触发 | `bus.on("ws:connected", fn)` + `bus.emit("ws:connected")` | `fn` 被调用 1 次 |
| 2 | `once` 只触发一次 | `bus.once("ws:connected", fn)` + emit 两次 | `fn` 被调用 1 次 |
| 3 | `off` 移除后不再触发 | `bus.on(...)``bus.off(...)` → emit | `fn` 未被调用 |
| 4 | 多参数事件 | `bus.on("pair:ready", fn)` + emit `{symbol, type, exchange}` | `fn` 收到完整的 3 个字段 |
| 5 | 多个监听器同时触发 | 注册 3 个 `bus.on("kline:update", ...)` → emit | 3 个全部触发 |
| 6 | emit 无监听器不报错 | `bus.emit("ws:stale")` 无任何监听 | 不抛异常 |
| 7 | `system:error` 事件带 context | emit `{source, error, context}` | `fn` 收到完整 payload |
| 8 | 空事件(无载荷) | `bus.emit("ws:connected")` | `fn` 被调用,参数为空数组 |
| 9 | `off` 未注册的 listener 不报错 | — | 不抛异常 |
> **注意**:`vitest` 默认并行执行测试文件,bus 是单例,多个测试文件可能互相干扰。使用 `beforeEach` 清空监听器隔离。
### 4.3 `tests/convert-kline.test.ts`
**被测函数**`data/exchanges/binance/rest.ts` 中的 `convertBinanceKline`
Binance SDK `Kline` 类型为 12 元组:
```
[openTime, open, high, low, close, volume, closeTime, quoteVolume, tradeCount, takerBuyBaseVol, takerBuyQuoteVol, ignore]
```
其中价格字段为 `string | number`
| # | 用例 | 输入 | 预期 |
|---|------|------|------|
| 1 | 标准 1m K 线转换 | 完整 12 元组,价格字段为 string | 返回 Kline 对象,各字段与输入对应 |
| 2 | 价格字段为 number 类型 | open/high/... 传 number | `String()` 转换后均为 string |
| 3 | `isClosed` = true | 任意有效输入 | `.isClosed === true` |
| 4 | `exchange` = `"binance"` | — | `.exchange === "binance"` |
| 5 | `symbol` 透传 | `symbol: "BTCUSDT"` | `.symbol === "BTCUSDT"` |
| 6 | `type` 透传 | `type: "spot"` / `"um"` | 原样透传 |
| 7 | `interval` 透传 | `interval: "1m"` | `.interval === "1m"` |
| 8 | openTime/closeTime 为 number | — | 类型为 `number`,值原样保留 |
| 9 | tradeCount 为 string | 传入 number `123` | `String(123)``"123"` |
| 10 | takerBuyBaseVol 非空 | 传入 `"1.5"` | `"1.5"` |
| 11 | takerBuyQuoteVol 非空 | 传入 `"50000"` | `"50000"` |
| 12 | volume 为 "0" | 传入 `0` | `"0"` |
| 13 | 第 12 位 ignore 字段被丢弃 | 传入任意值 | 不出现于返回对象中 |
| 14 | openTime 为 0 | `openTime: 0` | `.openTime === 0`(合法边界) |
| 15 | 大量价格字段 | 大数字如 `"999999999.99999999"` | 精确字符串保留 |
### 4.4 `tests/config.test.ts`
**被测函数**`data/config/validators.ts` 中的 `validateConfig``assertString``assertPort``assertBoolean``assertEnum`
| # | 用例 | 输入 | 预期 |
|---|------|------|------|
| 1 | 完整合法配置 | 合法 EnvConfig 对象 | 返回类型安全对象 |
| 2 | `null` 输入 | `null` | 抛错含 "顶层必须为 object" |
| 3 | 非 object 输入 | `"string"` / `123` | 抛错 |
| 4 | 缺失 `db` | `{redis: ..., exchange: ..., logging: ...}` | 抛错含 "缺少 db" |
| 5 | 缺失 `redis` | — | 抛错含 "缺少 redis" |
| 6 | 缺失 `exchange` | — | 抛错含 "缺少 exchange" |
| 7 | 缺失 `logging` | — | 抛错含 "缺少 logging" |
| 8 | `db.host` 为空字符串 | `host: ""` | 抛错含 "必须为非空字符串" |
| 9 | `db.host` 为 number | `host: 123` | 抛错 |
| 10 | `db.port` 为 0 | `port: 0` | 抛错含 "有效端口号" |
| 11 | `db.port` 为 65536 | `port: 65536` | 抛错含 "有效端口号" |
| 12 | `db.port` 为 "abc" | `port: "abc"` | 抛错 |
| 13 | `db.port` 为合法字符串 | `port: "5432"` | 返回 `5432`number |
| 14 | `logging.level` 非法 | `level: "verbose"` | 抛错含 "必须为 trace...之一" |
| 15 | `logging.node_env` 非法 | `node_env: "staging"` | 抛错 |
| 16 | `redis.publish_enabled` 字符串 `"true"` | — | 返回 `true`boolean |
| 17 | `redis.publish_enabled` 字符串 `"false"` | — | 返回 `false` |
| 18 | `redis.publish_enabled` 非 boolean 不可转换 | `publish_enabled: 123` | 抛错 |
| 19 | 缺失 `binance` | exchange 中无 binance 字段 | 抛错含 "缺少 binance" |
| 20 | 缺失 `binance_futures` | — | 抛错含 "缺少 binance_futures" |
## 五、实施步骤
| 步骤 | 内容 | 状态 |
|------|------|:--:|
| 1 | `tests/utils.test.ts` — getNowMinuteMS / getPrevMinuteMS / wait (9 cases) | ✅ |
| 2 | `tests/bus.test.ts` — TypedEventBus on/once/emit/off (12 cases) | ✅ |
| 3 | `tests/convert-kline.test.ts` — Binance 12 元组 → Kline (18 cases) | ✅ |
| 4 | `tests/config.test.ts` — validateConfig + assert* (24 cases) | ✅ |
首批 L1+L2 共 4 文件 63 用例全部通过。
## 六、后续(L3
L1/L2 跑通后,追加需要 mock 的测试:
| 文件 | 被测模块 | mock 对象 |
|------|----------|-----------|
| `tests/kline.test.ts` | `upsertOrUpdateKlines` | `AppDataSource.getRepository` |
| `tests/backfill.test.ts` | `backfillKline` | `fetchKlines``upsertOrUpdateKlines``getAllPairs` |
## 七、CI 接入(将来)
```bash
bun run test
```
不引入覆盖率门槛,先保证有测试可跑,后续设定 ≥60% 行覆盖率目标。