diff --git a/data/db/data-source.ts b/data/db/data-source.ts index 84674b5..aa6c7ed 100644 --- a/data/db/data-source.ts +++ b/data/db/data-source.ts @@ -1,7 +1,9 @@ import { DataSource } from "typeorm"; -import { pgsql } from "../config"; +import { pgsql, printConfigSummary } from "../config"; import * as entities from "./entities"; +printConfigSummary(); + export const AppDataSource = new DataSource({ type: "postgres", host: pgsql.host, diff --git a/data/exchanges/rest.ts b/data/exchanges/rest.ts index 1a52d09..248c3c8 100644 --- a/data/exchanges/rest.ts +++ b/data/exchanges/rest.ts @@ -104,6 +104,8 @@ async function fetchBinanceKlines( const client = new MainClient({ api_key: 'ONSJKIGRpDYLn6FdV17aAKfjclZ4I2LzamflhuMpsoRQA427lLKeyJlGtg2RZ7DH', api_secret: '5Mfv4TgvDlRzCHbtl2nJL4mVHUvMm8pyjKiRjMoosBMxrhlqMw6CuQbg2qbS2Npd', + }, { + timeout: 3000, }); // Binance 硬限制:单次最多 1000 条 diff --git a/data/package.json b/data/package.json index b0f77ee..50e7b4c 100644 --- a/data/package.json +++ b/data/package.json @@ -32,8 +32,9 @@ "@types/ws": "^8.18.1", "eslint": "^10.4.1", "prettier": "^3.8.3", + "ts-node": "^10.9.2", "tsx": "^4.22.4", "typescript": "^6.0.3", "vitest": "^4.1.8" } -} \ No newline at end of file +} diff --git a/data/run/build_aggregates_sql.ts b/data/run/build_aggregates_sql.ts new file mode 100644 index 0000000..f946a45 --- /dev/null +++ b/data/run/build_aggregates_sql.ts @@ -0,0 +1,436 @@ +// ============================================================ +// build_aggregates_sql.ts — TimescaleDB 连续聚合逐月刷新脚本 +// ============================================================ +// 用途: +// 按月份粒度逐月刷新 klines 分层连续聚合物化视图链: +// 5m → 15m → 30m → 1h → 4h → 1d → 1w +// 每层依赖下一层的数据,因此严格按从低到高顺序刷新。 +// +// 使用方式: +// # 仅生成 SQL 不执行(dry-run,默认) +// bun run data/run/build_aggregates_sql.ts +// +// # 实际执行(连接数据库刷新) +// bun run data/run/build_aggregates_sql.ts --execute +// +// # 指定时间范围 +// bun run data/run/build_aggregates_sql.ts --start 2019-08 --end 2020-03 --execute +// +// 配置: +// 数据库连接信息从项目根目录 env.yaml 读取(通过 ../config 模块)。 +// 确保 env.yaml 中 db 段配置正确。 +// +// 风险提示: +// - refresh_continuous_aggregate 会获取表级锁,建议在低流量时段执行 +// - 逐月刷新避免单次锁定时间过长,每层每个月的刷新是独立事务 +// - 如果某层某月刷新失败,脚本会记录错误并继续处理后续月份 +// ============================================================ + +import { logger } from "../utils/logger"; +// AppDataSource 通过动态导入延迟加载,避免 dry-run 模式下触发数据库连接 + +// ============================================================ +// 1. 常量定义 +// ============================================================ + +/** + * 分层聚合视图链(按依赖顺序:低层级 → 高层级) + * + * 刷新顺序至关重要: + * klines_5m 源数据来自 klines(1m 基表) + * klines_15m 源数据来自 klines_5m + * klines_30m 源数据来自 klines_15m + * klines_1h 源数据来自 klines_30m + * klines_4h 源数据来自 klines_1h + * klines_1d 源数据来自 klines_4h + * klines_1w 源数据来自 klines_1d + * + * 必须严格按此顺序刷新,否则高层级聚合会缺少数据。 + */ +const AGGREGATE_VIEWS = [ + "klines_5m", + "klines_15m", + "klines_30m", + "klines_1h", + "klines_4h", + "klines_1d", + "klines_1w", +] as const; + +/** 默认起始年月 */ +const DEFAULT_START = { year: 2017, month: 8 }; // 2018-09 +/** 默认结束年月 */ +const DEFAULT_END = { year: 2026, month: 6 }; // 2019-01 + +// ============================================================ +// 2. 工具函数 +// ============================================================ + +/** 解析命令行参数为 key-value 映射 */ +function parseArgs(args: string[]): Map { + const map = new Map(); + for (let i = 0; i < args.length; i++) { + const arg = args[i]!; // 循环条件保证 i < args.length + if (arg.startsWith("--")) { + const key = arg.slice(2); + // 下一个参数如果不是 -- 开头,则为 value;否则是 boolean flag + const next = args[i + 1]; + if (next && !next.startsWith("--")) { + map.set(key, next); + i++; + } else { + map.set(key, "true"); + } + } + } + return map; +} + +/** + * 解析 "YYYY-MM" 格式字符串为 { year, month } + * 月份使用 1-based(与 JS Date 不同,便于人类阅读) + */ +function parseYearMonth(input: string): { year: number; month: number } | null { + const match = input.match(/^(\d{4})-(\d{2})$/); + if (!match) { + return null; + } + // match[1] / match[2] 在正则匹配成功时一定存在,使用 ! 断言 + const year = parseInt(match[1]!, 10); + const month = parseInt(match[2]!, 10); + if (month < 1 || month > 12) return null; + return { year, month }; +} + +/** + * 生成从 start 到 end(含)的所有年月序列 + * + * 例如 start=2018-09, end=2019-01 → [2018-09, 2018-10, 2018-11, 2018-12, 2019-01] + * + * @param start - 起始年月(1-based month) + * @param end - 结束年月(1-based month),包含该月 + * @returns 按时间升序排列的年月数组 + */ +function generateMonthRange( + start: { year: number; month: number }, + end: { year: number; month: number } +): Array<{ year: number; month: number }> { + const result: Array<{ year: number; month: number }> = []; + + let y = start.year; + let m = start.month; // 1-based + + // 将 end 转为总月份数,便于比较 + const endTotal = end.year * 12 + (end.month - 1); + + while (true) { + const currentTotal = y * 12 + (m - 1); + if (currentTotal > endTotal) break; + + result.push({ year: y, month: m }); + + // 递增月份 + m++; + if (m > 12) { + m = 1; + y++; + } + } + + return result; +} + +/** + * 为指定年月构建 refresh_continuous_aggregate SQL + * + * TimescaleDB 中 refresh_continuous_aggregate 的 window 是左闭右开: + * [window_start, window_end) + * 因此 window_start = 当月1日 00:00:00+00 + * window_end = 下月1日 00:00:00+00 + * + * @param viewName - 聚合视图名称,如 "klines_5m" + * @param year - 年份 + * @param month - 月份(1-based) + * @returns 可执行的 SQL 字符串 + */ +function buildRefreshSQL( + viewName: string, + year: number, + month: number +): string { + // 计算下个月的年月 + let nextMonth = month + 1; + let nextYear = year; + if (nextMonth > 12) { + nextMonth = 1; + nextYear = year + 1; + } + + const pad = (n: number) => String(n).padStart(2, "0"); + const windowStart = `${year}-${pad(month)}-01 00:00:00+00`; + const windowEnd = `${nextYear}-${pad(nextMonth)}-01 00:00:00+00`; + + return `CALL refresh_continuous_aggregate('${viewName}', '${windowStart}', '${windowEnd}');`; +} + +/** + * 为所有聚合视图生成某个指定月份的 SQL 语句列表 + * + * @returns 按依赖顺序排列的 SQL 语句数组 + */ +function buildMonthSQLBatch( + year: number, + month: number +): Array<{ view: string; sql: string }> { + return AGGREGATE_VIEWS.map((view) => ({ + view, + sql: buildRefreshSQL(view, year, month), + })); +} + +// ============================================================ +// 3. 核心执行逻辑 +// ============================================================ + +/** 单次刷新操作的统计 */ +interface RefreshStats { + /** 尝试执行的视图数 */ + totalViews: number; + /** 成功数 */ + success: number; + /** 失败数 */ + failed: number; + /** 失败的详情列表 */ + errors: Array<{ view: string; month: string; error: string }>; +} + +/** + * 执行单个月份所有聚合视图的刷新 + * + * 按依赖顺序依次执行,每个视图的刷新是独立事务。 + * 某个视图失败不会阻止后续视图的执行。 + * + * @param ds - TypeORM DataSource 实例 + * @param year - 年份 + * @param month - 月份(1-based) + * @returns 该月份的刷新统计 + */ +async function refreshSingleMonth( + ds: import("typeorm").DataSource, + year: number, + month: number +): Promise { + const pad = (n: number) => String(n).padStart(2, "0"); + const monthLabel = `${year}-${pad(month)}`; + const batch = buildMonthSQLBatch(year, month); + + const stats: RefreshStats = { + totalViews: batch.length, + success: 0, + failed: 0, + errors: [], + }; + + for (const { view, sql } of batch) { + try { + logger.info(`[refresh] ${monthLabel} | ${view} 开始刷新...`); + const startMs = Date.now(); + await ds.query(sql); + const elapsed = ((Date.now() - startMs) / 1000).toFixed(1); + logger.info(`[refresh] ${monthLabel} | ${view} 完成 (${elapsed}s)`); + stats.success++; + } catch (err: unknown) { + const errMsg = err instanceof Error ? err.message : String(err); + logger.error(`[refresh] ${monthLabel} | ${view} 失败: ${errMsg}`); + stats.failed++; + stats.errors.push({ + view, + month: monthLabel, + error: errMsg, + }); + } + } + + return stats; +} + +/** + * 汇总统计信息 + */ +function summarizeStats( + allStats: RefreshStats[], + totalMonths: number, + totalElapsedMs: number +): void { + const totalViews = allStats.reduce((sum, s) => sum + s.totalViews, 0); + const totalSuccess = allStats.reduce((sum, s) => sum + s.success, 0); + const totalFailed = allStats.reduce((sum, s) => sum + s.failed, 0); + const allErrors = allStats.flatMap((s) => s.errors); + + logger.info("========================================"); + logger.info("[refresh] 聚合刷新完成"); + logger.info(` 总月份数: ${totalMonths}`); + logger.info(` 总视图数: ${totalViews} (${AGGREGATE_VIEWS.length} 视图 × ${totalMonths} 月)`); + logger.info(` 成功: ${totalSuccess}`); + logger.info(` 失败: ${totalFailed}`); + logger.info(` 总耗时: ${(totalElapsedMs / 1000).toFixed(1)}s`); + logger.info("========================================"); + + if (allErrors.length > 0) { + logger.warn(`以下 ${allErrors.length} 次刷新失败,请检查:`); + for (const e of allErrors) { + logger.warn(` - ${e.month} / ${e.view}: ${e.error}`); + } + } +} + +// ============================================================ +// 4. Dry-Run 模式(仅生成 SQL,不连接数据库) +// ============================================================ + +/** + * 仅输出 SQL 语句,不连接数据库也不执行。 + * 适用于审查 SQL 或手动拷贝到 psql 执行。 + */ +function dryRun( + start: { year: number; month: number }, + end: { year: number; month: number } +): void { + const months = generateMonthRange(start, end); + const pad = (n: number) => String(n).padStart(2, "0"); + + console.log("-- ============================================================"); + console.log( + `-- TimescaleDB 连续聚合刷新 SQL(Dry-Run)` + ); + console.log( + `-- 时间范围: ${start.year}-${pad(start.month)} → ${end.year}-${pad(end.month)}` + ); + console.log(`-- 月份数: ${months.length}`); + console.log( + `-- 视图链: ${AGGREGATE_VIEWS.join(" → ")}` + ); + console.log("-- ============================================================\n"); + + for (const { year, month } of months) { + const monthLabel = `${year}-${pad(month)}`; + console.log(`-- [${monthLabel}] ----------------------------------------`); + for (const { view, sql } of buildMonthSQLBatch(year, month)) { + console.log(`-- ${view}`); + console.log(sql); + } + console.log(); + } + + console.log( + `-- 共生成 ${months.length * AGGREGATE_VIEWS.length} 条 SQL 语句` + ); + console.log( + "-- 使用 --execute 参数实际执行上述 SQL" + ); +} + +// ============================================================ +// 5. 主入口 +// ============================================================ + +async function main(): Promise { + const args = parseArgs(Bun.argv.slice(2)); // Bun.argv[0] = bun, [1] = script path + + // 解析时间范围参数 + const startRaw = args.get("start") ?? `${DEFAULT_START.year}-${String(DEFAULT_START.month).padStart(2, "0")}`; + const endRaw = args.get("end") ?? `${DEFAULT_END.year}-${String(DEFAULT_END.month).padStart(2, "0")}`; + + const start = parseYearMonth(startRaw); + const end = parseYearMonth(endRaw); + + if (!start) { + logger.error(`无效的起始年月: "${startRaw}",格式应为 YYYY-MM(如 2018-09)`); + process.exit(1); + } + if (!end) { + logger.error(`无效的结束年月: "${endRaw}",格式应为 YYYY-MM(如 2019-01)`); + process.exit(1); + } + + // 验证 start <= end + const startTotal = start.year * 12 + start.month; + const endTotal = end.year * 12 + end.month; + if (startTotal > endTotal) { + logger.error( + `起始时间 (${startRaw}) 不能晚于结束时间 (${endRaw})` + ); + process.exit(1); + } + + const months = generateMonthRange(start, end); + const pad = (n: number) => String(n).padStart(2, "0"); + + logger.info( + `[refresh] 时间范围: ${start.year}-${pad(start.month)} → ${end.year}-${pad(end.month)},共 ${months.length} 个月` + ); + + // Dry-run 模式:仅输出 SQL + const shouldExecute = args.has("execute"); + if (!shouldExecute) { + logger.info("[refresh] Dry-Run 模式:仅生成 SQL,不执行。添加 --execute 参数实际执行。"); + dryRun(start, end); + process.exit(0); + } + + // 执行模式:连接数据库逐月刷新 + logger.info("[refresh] 执行模式:连接数据库并逐月刷新聚合..."); + + // 动态导入 DataSource(仅在执行模式下连接数据库) + const { AppDataSource } = await import("../db/data-source"); + + // 确保 DataSource 已初始化 + if (!AppDataSource.isInitialized) { + await AppDataSource.initialize(); + logger.info("[refresh] 数据库连接已建立"); + } + + const allStats: RefreshStats[] = []; + const totalStartMs = Date.now(); + + for (let i = 0; i < months.length; i++) { + const { year, month } = months[i]!; // 循环条件保证 i < months.length + const monthLabel = `${year}-${pad(month)}`; + const progress = `[${i + 1}/${months.length}]`; + + logger.info(`\n${progress} 开始处理月份: ${monthLabel}`); + + const stats = await refreshSingleMonth(AppDataSource, year, month); + allStats.push(stats); + + // 单月汇总 + logger.info( + `${progress} ${monthLabel} 完成: ${stats.success}/${stats.totalViews} 成功` + + (stats.failed > 0 ? `, ${stats.failed} 失败` : "") + ); + } + + const totalElapsedMs = Date.now() - totalStartMs; + summarizeStats(allStats, months.length, totalElapsedMs); + + // 如果有失败,以非零退出码退出 + const totalFailed = allStats.reduce((sum, s) => sum + s.failed, 0); + if (totalFailed > 0) { + process.exit(1); + } + + // 优雅关闭数据库连接(仅在执行模式下) + if (AppDataSource.isInitialized) { + await AppDataSource.destroy(); + logger.info("[refresh] 数据库连接已关闭"); + } +} + +// ============================================================ +// 6. 启动 +// ============================================================ + +main().catch((err) => { + logger.error({ err }, "[refresh] 脚本执行异常"); + process.exit(1); +}); diff --git a/data/run_exchange.sh b/data/run_exchange.sh new file mode 100755 index 0000000..ef14128 --- /dev/null +++ b/data/run_exchange.sh @@ -0,0 +1 @@ +bun run ./run/exchange.ts \ No newline at end of file diff --git a/env.yaml b/env.yaml index fa6cf41..30b081e 100644 --- a/env.yaml +++ b/env.yaml @@ -10,7 +10,7 @@ # --- TimescaleDB / PostgreSQL 连接 --- db: - host: localhost + host: 10.0.0.7 port: 5432 name: trade user: trader