import { AppDataSource } from "../db/data-source"; import { Kline } from "../db/entities/kline.entity"; import type { Kline as KlineItem } from "../types"; import { logger } from "../utils/logger"; const repo = AppDataSource.getRepository(Kline); /** * 批量 UPSERT K 线数据到 TimescaleDB。 * * 映射应用层 KlineItem → 数据库实体,通过 INSERT ... ON CONFLICT DO UPDATE * 实现幂等写入。冲突列为 [exchange, symbol, interval, time](四列复合主键), * 冲突时更新 OHLCV 及扩展字段。 * * 适用场景: * - 回补历史 K 线(幂等,重复拉取不产生重复行) * - WebSocket 实时 K 线增量刷新(更新最新一根未闭合 K 线的 high/low/close/volume) * * 注意:依赖 Kline 实体的四列复合主键 [exchange, symbol, interval, time]。 * 若实体 PK 结构变更,需同步更新 conflictPaths。 * * @param KlineItems - 应用层标准化 K 线数组 */ export async function upsertOrUpdateKlines(KlineItems: KlineItem[]) { if (KlineItems.length === 0) { return; } logger.debug({ count: KlineItems.length }, "开始批量 UPSERT K 线"); // 应用层 KlineItem → 数据库实体 Kline // 注意类型转换:应用层价格为 string(兼容交易所 SDK),DB 层为 NUMERIC(number) const entities = KlineItems.map((item) => { const entity = new Kline(); entity.time = new Date(item.openTime); // Unix ms → Date entity.exchange = item.exchange; entity.symbol = item.symbol; entity.interval = item.interval; entity.open = Number(item.open); entity.high = Number(item.high); entity.low = Number(item.low); entity.close = Number(item.close); entity.volume = Number(item.volume); entity.quote_volume = item.quoteVolume ? Number(item.quoteVolume) : undefined; entity.taker_buy_base_vol = item.takerBuyBaseVol ? Number(item.takerBuyBaseVol) : undefined; entity.taker_buy_quote_vol = item.takerBuyQuoteVol ? Number(item.takerBuyQuoteVol) : undefined; entity.trade_count = item.tradeCount ? Number(item.tradeCount) : undefined; entity.is_closed = item.isClosed; return entity; }); try { // UPSERT: 冲突列匹配复合主键 [exchange, symbol, interval, time] // 实体已改为四列复合 PK,ON CONFLICT 直接命中主键约束 // skipUpdateIfNoValuesChanged: 减少不必要的写操作 const result = await repo.upsert(entities, { conflictPaths: ["exchange", "symbol", "interval", "time"], skipUpdateIfNoValuesChanged: true, }); logger.info( { count: KlineItems.length, generatedMaps: result.generatedMaps.length }, "K 线 UPSERT 完成", ); } catch (err) { logger.error({ err, count: KlineItems.length }, "K 线 UPSERT 失败"); throw err; } }