// ============================================================ // db/pg.ts — PostgreSQL / TimescaleDB 连接池管理 // ============================================================ // 职责: // 1. 基于 config.ts 的 pgsql 配置创建 pg.Pool 单例 // 2. 连接生命周期事件监听(connect / acquire / remove / error) // 3. 提供健康检查(healthCheck) // 4. 提供事务辅助函数(withTransaction) // 5. 进程退出时优雅关闭(SIGTERM / SIGINT) // // 使用方式: // import { pool } from "./db"; // 通过 index.ts 统一导出 // import { healthCheck } from "./db"; // const { rows } = await pool.query("SELECT NOW()"); // ============================================================ import pg from "pg"; import { readFileSync, readdirSync } from "node:fs"; import { resolve, dirname } from "node:path"; import { fileURLToPath } from "node:url"; import { pgsql } from "../config"; // ============================================================ // 1. 连接池创建(单例) // ============================================================ /** * pg.Pool 单例。 * 配置来源于 config.ts → pgsql,已包含连接数上限、超时等参数。 * * pg.Pool 内部使用懒连接——首次 query 时才建立连接, * 因此模块加载时不会立即连接数据库。 */ export const pool = new pg.Pool({ host: pgsql.host, port: pgsql.port, database: pgsql.database, user: pgsql.user, password: pgsql.password, max: pgsql.max, idleTimeoutMillis: pgsql.idleTimeoutMillis, connectionTimeoutMillis: pgsql.connectionTimeoutMillis, // TimescaleDB 特有的超时设置:分析查询可能较慢 statement_timeout: 30000, // 单条 SQL 最大执行 30s // application_name 便于在 pg_stat_activity 中识别 application_name: "trade-data", }); // ============================================================ // 2. 连接池事件监听(可观测性) // ============================================================ /** 新客户端连接建立 */ pool.on("connect", (client) => { // 为每个连接设置 TimescaleDB 优化参数 // 跳过 WAL 日志可加速批量写入(仅在可接受丢失最近几秒数据的场景) // client.query("SET timescaledb.enable_skip_scan = ON"); console.log(`[pg] 新连接建立 (total: ${pool.totalCount}, idle: ${pool.idleCount})`); }); /** 从池中获取连接 */ pool.on("acquire", () => { // 连接池耗尽时会频繁触发,可在此记录高负载信号 }); /** 连接归还池 */ pool.on("remove", () => { console.log(`[pg] 连接关闭 (total: ${pool.totalCount}, idle: ${pool.idleCount})`); }); /** * 空闲客户端出错(如网络中断、PG 重启)。 * pg.Pool 会自动移除问题连接并创建新连接,此处仅记录日志。 */ pool.on("error", (err: Error) => { console.error(`[pg] 连接池错误: ${err.message}`); // 不退出进程——连接池会自动恢复 }); // ============================================================ // 3. 健康检查 // ============================================================ /** * 数据库连通性检查。 * 执行轻量查询 `SELECT 1`,超时 5 秒。 * * @returns true 表示数据库可达 */ export async function healthCheck(): Promise { try { const client = await pool.connect(); try { await client.query("SELECT 1"); return true; } finally { client.release(); } } catch { return false; } } /** * 深度健康检查:验证 TimescaleDB 扩展是否已安装。 * 仅在首次连接或定期巡检时调用。 * * @returns TimescaleDB 版本字符串,未安装则返回 null */ export async function timescaleVersion(): Promise { try { const { rows } = await pool.query<{ extversion: string }>( "SELECT extversion FROM pg_extension WHERE extname = 'timescaledb'", ); return rows[0]?.extversion ?? null; } catch { return null; } } // ============================================================ // 4. 事务辅助 // ============================================================ /** * 在单连接上执行事务。 * 自动 BEGIN / COMMIT / ROLLBACK,连接用完即释放。 * * @param fn - 事务体,接收 pg.PoolClient,返回 Promise * @returns fn 的返回值 * @throws 事务内任何异常都会触发 ROLLBACK 并向上抛出 * * @example * const result = await withTransaction(async (client) => { * await client.query("INSERT INTO ..."); * await client.query("UPDATE ..."); * return { success: true }; * }); */ export async function withTransaction( fn: (client: pg.PoolClient) => Promise, ): Promise { const client = await pool.connect(); try { await client.query("BEGIN"); const result = await fn(client); await client.query("COMMIT"); return result; } catch (err) { await client.query("ROLLBACK"); throw err; } finally { client.release(); } } // ============================================================ // 5. 优雅关闭 // ============================================================ /** 是否正在关闭(防止重复调用) */ let shuttingDown = false; /** * 关闭连接池。 * 先等待所有进行中的查询完成(drain),再断开所有连接。 * * 应在进程退出前调用:SIGTERM / SIGINT 处理器中。 */ export async function closePool(): Promise { if (shuttingDown) { return; } shuttingDown = true; console.log("[pg] 正在关闭连接池..."); // pool.end() 等待所有活跃查询完成后关闭 await pool.end(); console.log("[pg] 连接池已关闭"); } /** * 注册进程信号处理器——优雅关闭。 * 在应用入口调用一次即可。 */ export function registerShutdownHandlers(): void { const shutdown = async (signal: string) => { console.log(`[pg] 收到 ${signal} 信号,开始优雅关闭...`); await closePool(); process.exit(0); }; process.once("SIGTERM", () => shutdown("SIGTERM")); process.once("SIGINT", () => shutdown("SIGINT")); } // ============================================================ // 6. Schema 初始化(基于 data/schema/*.sql) // ============================================================ /** * 将 SQL 文本按语句拆分为独立命令。 * 规则: * - 按 `;` 分割 * - 跳过空行和纯注释行 * - 单个语句去除首尾空白后若为空则跳过 * * 注意:此方法假设 SQL 中 `;` 仅作为语句分隔符, * 不含存储过程/函数体内的 `;`(schema/*.sql 满足此条件)。 */ function splitSQLStatements(sql: string): string[] { const statements: string[] = []; for (const raw of sql.split(";")) { const trimmed = raw.trim(); // 跳过空语句 if (trimmed === "") { continue; } // 跳过仅包含注释的行(已在上层过滤,此处兜底) statements.push(trimmed); } return statements; } /** * 从单个 .sql 文件初始化 Schema。 * 读取文件 → 拆分语句 → 逐条执行(同一连接,保证顺序)。 * * 所有 SQL 均使用 IF NOT EXISTS / ON CONFLICT DO NOTHING, * 因此重复执行安全幂等。 * * @param filePath - SQL 文件的绝对路径 * @returns 成功执行的语句数 */ export async function initSchemaFromFile(filePath: string): Promise { const sql = readFileSync(filePath, "utf-8"); // 预处理:移除纯注释行(以 -- 开头),减少无效语句 const lines = sql .split("\n") .filter((line: string) => { const trimmed = line.trim(); return trimmed !== "" && !trimmed.startsWith("--"); }) .join("\n"); const statements = splitSQLStatements(lines); if (statements.length === 0) { return 0; } let executed = 0; // 使用单连接执行所有语句,保证 DDL 顺序(如先建表再建索引) const client = await pool.connect(); try { for (const stmt of statements) { await client.query(stmt); executed++; } } finally { client.release(); } return executed; } /** * 从 data/schema/ 目录初始化所有表结构。 * * 执行顺序(按文件名排序): * 1. klines.sql — K 线主表(hypertable)、索引、压缩、连续聚合 * 2. config.sql — 配置表(monitored_symbols / exchange_config / app_config)+ 预置数据 * * Docker Compose 首次启动时,001_init.sql 已被自动执行。 * 此函数作为补充,确保以下场景: * - 裸 PostgreSQL 安装(非 Docker 部署) * - 版本升级时需要新增表/索引 * - 开发环境快速重置 Schema * * @param schemaDir - schema 目录路径,默认 ../schema(相对于本文件) * @returns 各文件执行结果摘要 */ export async function initSchema(schemaDir?: string): Promise<{ files: string[]; totalStatements: number; errors: string[]; }> { // 计算 schema 目录绝对路径(ESM 中 __dirname 不可用) const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); const dir = schemaDir ?? resolve(__dirname, "..", "schema"); const result = { files: [] as string[], totalStatements: 0, errors: [] as string[] }; // 读取目录,按文件名排序保证执行顺序(klines.sql 先于 config.sql) let entries: string[]; try { entries = readdirSync(dir) .filter((f: string) => f.endsWith(".sql")) .sort(); // 字母序 } catch { result.errors.push(`无法读取 schema 目录: ${dir}`); return result; } // 手动排序:klines.sql 必须在 config.sql 之前(外键/引用依赖) const klinesFirst = ["klines.sql", "config.sql"]; entries.sort((a, b) => { const ia = klinesFirst.indexOf(a); const ib = klinesFirst.indexOf(b); if (ia !== -1 && ib !== -1) return ia - ib; if (ia !== -1) return -1; if (ib !== -1) return 1; return a.localeCompare(b); }); for (const entry of entries) { const filePath = resolve(dir, entry); try { const count = await initSchemaFromFile(filePath); result.files.push(`${entry} (${count} 条)`); result.totalStatements += count; } catch (err) { const msg = `${entry}: ${(err as Error).message}`; result.errors.push(msg); console.error(`[pg] Schema 初始化失败 — ${msg}`); } } if (result.errors.length === 0) { console.log( `[pg] Schema 初始化完成 — ${result.files.length} 个文件, ${result.totalStatements} 条语句`, ); } return result; } /** * 快速判断核心表是否已存在(轻量级检查,不做全量 Schema 验证)。 * 仅在执行 initSchema() 前做一次探测,已存在则跳过。 * * @returns true 表示 klines 和 monitored_symbols 表均已存在 */ export async function isSchemaInitialized(): Promise { try { const { rows } = await pool.query<{ count: string }>(` SELECT COUNT(*)::TEXT AS count FROM information_schema.tables WHERE table_schema = 'public' AND table_name IN ('klines', 'monitored_symbols', 'exchange_config', 'app_config') `); // 4 张核心表全部存在 return parseInt(rows[0]?.count ?? "0", 10) >= 4; } catch { return false; } } // ============================================================ // 7. 默认导出(便于 import pool from "./db/pg") // ============================================================ export default pool;