From 309b11ae30cc065eac710a27bd18eba9045d9e3f Mon Sep 17 00:00:00 2001 From: Rekey Date: Wed, 10 Jun 2026 20:03:00 +0800 Subject: [PATCH] =?UTF-8?q?fix(db):=20TypeORM=20=E5=8E=8B=E7=BC=A9?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E5=AF=B9=E9=BD=90=20SQL=20DDL=EF=BC=8C?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=20TimescaleDB=20=E5=88=9D=E5=A7=8B=E5=8C=96?= =?UTF-8?q?=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - kline.entity.ts: compress_segmentby 移除 interval(基表固定 1m 无需分段),schedule_interval 365d→30d 与 init-db SQL 一致 - data-source.ts: 生产环境关闭 synchronize,以 init-db SQL 脚本为建表唯一来源 - 新增 data/db/init-db/ 初始化 SQL 链: 01-timescaledb.sql — 启用 TimescaleDB 扩展 02-init-tables.sql — 核心业务表 + klines hypertable(7d chunk / 30d 压缩) 03-continuous-aggregates.sql — 分层连续聚合视图(5m→15m→30m→1h→4h→1d→1w) --- data/db/data-source.ts | 2 +- data/db/entities/kline.entity.ts | 6 +- data/db/init-db/01-timescaledb.sql | 30 ++ data/db/init-db/02-init-tables.sql | 344 +++++++++++++++++++ data/db/init-db/03-continuous-aggregates.sql | 277 +++++++++++++++ 5 files changed, 655 insertions(+), 4 deletions(-) create mode 100644 data/db/init-db/01-timescaledb.sql create mode 100644 data/db/init-db/02-init-tables.sql create mode 100644 data/db/init-db/03-continuous-aggregates.sql diff --git a/data/db/data-source.ts b/data/db/data-source.ts index 73adb63..84674b5 100644 --- a/data/db/data-source.ts +++ b/data/db/data-source.ts @@ -15,7 +15,7 @@ export const AppDataSource = new DataSource({ ...Object.values(entities), ], // 生产环境禁用 synchronize,使用 Migration - synchronize: true, + synchronize: false, migrations: [__dirname + "/migrations/*.{ts,js}"], // 连接池 extra: { diff --git a/data/db/entities/kline.entity.ts b/data/db/entities/kline.entity.ts index 3f85a4b..745a079 100644 --- a/data/db/entities/kline.entity.ts +++ b/data/db/entities/kline.entity.ts @@ -7,7 +7,7 @@ // // 关键 TimescaleDB 特性(由 @Hypertable 装饰器自动配置): // - 自动按 time 列做时间分区(by_range) -// - 列式压缩(compress),7 天后自动执行 +// - 列式压缩(compress),30 天后自动执行 // - 通过 ContinuousAggregate 生成高周期 K 线视图 // // 注意:@timescaledb/typeorm v0.0.1 为实验版本, @@ -40,9 +40,9 @@ import type { KlineInterval } from '../../types'; compression: { compress: true, compress_orderby: "time DESC", - compress_segmentby: "exchange, symbol, interval", + compress_segmentby: "exchange, symbol", policy: { - schedule_interval: "365 days", // 365 天后自动压缩 + schedule_interval: "30 days", // 30 天后自动压缩 }, }, }) diff --git a/data/db/init-db/01-timescaledb.sql b/data/db/init-db/01-timescaledb.sql new file mode 100644 index 0000000..f7748fa --- /dev/null +++ b/data/db/init-db/01-timescaledb.sql @@ -0,0 +1,30 @@ +-- ============================================================ +-- 01-timescaledb.sql — TimescaleDB 扩展初始化 +-- ============================================================ +-- Docker 容器首次启动时自动执行(/docker-entrypoint-initdb.d/) +-- 确保 TimescaleDB 扩展在数据库级别启用。 +-- +-- init-db 完整执行链(按字母序自动执行): +-- 01-timescaledb.sql — 本文件:启用 TimescaleDB 扩展 +-- 02-init-tables.sql — 核心业务表(exchanges / trading_pairs / klines) +-- 03-continuous-aggregates.sql — K 线分层连续聚合视图(5m → 1w) +-- +-- 注意: +-- - klines 基表由 02-init-tables.sql 创建为 TimescaleDB hypertable +-- - 连续聚合视图由 03-continuous-aggregates.sql 创建 +-- - TypeORM 的 synchronize:true 与 SQL 脚本互为 fallback(开发/生产双路径) +-- - 本脚本为 init-db 链的第一环,仅负责扩展启用 +-- ============================================================ + +-- 启用 TimescaleDB 扩展(必须最先执行) +CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE; + +-- 验证扩展已启用 +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_extension WHERE extname = 'timescaledb' + ) THEN + RAISE EXCEPTION 'TimescaleDB extension is not installed'; + END IF; +END $$; diff --git a/data/db/init-db/02-init-tables.sql b/data/db/init-db/02-init-tables.sql new file mode 100644 index 0000000..62b3ed5 --- /dev/null +++ b/data/db/init-db/02-init-tables.sql @@ -0,0 +1,344 @@ +-- ============================================================ +-- 02-init-tables.sql — 核心业务表建表语句 +-- ============================================================ +-- 根据 data/db/entities/ 中的实体定义生成对应 PostgreSQL DDL。 +-- +-- 表结构对应关系: +-- exchanges ← exchange.entity.ts (Exchange extends CommonBaseEntity) +-- trading_pairs ← trading-pair.entity.ts (TradingPair extends CommonBaseEntity) +-- klines ← kline.entity.ts (Kline — TimescaleDB Hypertable) +-- +-- 执行前提: +-- 1. 01-timescaledb.sql 已执行(TimescaleDB 扩展已启用) +-- 2. PostgreSQL 版本 >= 13(gen_random_uuid() 内建支持) +-- +-- 幂等性:全程使用 IF NOT EXISTS / IF EXISTS,可重复执行。 +-- ============================================================ + +-- ============================================================ +-- 第一节:交易所配置表(exchanges) +-- ============================================================ +-- 存储已接入的交易所元信息(Binance / OKX / Bybit 等)。 +-- 由 TypeORM Exchange 实体管理(关系数据域)。 +-- ============================================================ + +CREATE TABLE IF NOT EXISTS exchanges ( + -- UUID 主键(非自增整数,便于分布式场景) + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + + -- 交易所唯一标识(如 binance / okx / bybit) + name VARCHAR(50) NOT NULL UNIQUE, + + -- 交易所显示名称(如 Binance / OKX / Bybit) + label VARCHAR(100) NOT NULL, + + -- 是否启用该交易所的数据采集 + enabled BOOLEAN NOT NULL DEFAULT TRUE, + + -- 交易所特定配置(JSON:费率、最小下单量、API 限频等) + config JSONB, + + -- 记录创建时间(自动填充) + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + + -- 最后更新时间(触发器自动刷新,见文末) + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- 交易所名称查询索引 +CREATE INDEX IF NOT EXISTS idx_exchanges_name ON exchanges (name); +-- 启用状态筛选索引 +CREATE INDEX IF NOT EXISTS idx_exchanges_enabled ON exchanges (enabled); + + +-- ============================================================ +-- 第二节:交易对配置表(trading_pairs) +-- ============================================================ +-- 存储各交易所的交易对元信息。数据模块启动时从该表读取 +-- active=true 的交易对列表,决定 WebSocket 订阅范围和 K 线合成范围。 +-- ============================================================ + +CREATE TABLE IF NOT EXISTS trading_pairs ( + -- UUID 主键 + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + + -- 所属交易所(逻辑外键 → exchanges.id) + exchange_id UUID NOT NULL + REFERENCES exchanges(id) ON DELETE CASCADE, + + -- 交易对符号(如 BTCUSDT / ETHUSDT) + symbol VARCHAR(20) NOT NULL, + + -- 基础币种(如 BTC) + base_asset VARCHAR(10) NOT NULL, + + -- 计价币种(如 USDT) + quote_asset VARCHAR(10) NOT NULL, + + -- 价格精度(小数位数) + price_precision INTEGER NOT NULL DEFAULT 10, + + -- 数量精度(小数位数) + quantity_precision INTEGER NOT NULL DEFAULT 10, + + -- 最小下单量 + min_qty NUMERIC(32, 8), + + -- 下单步长(数量增量) + step_size NUMERIC(32, 8), + + -- 最小名义价值(USDT) + min_notional NUMERIC(32, 8), + + -- 是否激活数据订阅(false 时不采集该交易对行情) + active BOOLEAN NOT NULL DEFAULT TRUE, + + -- 是否启用 K 线合成(false 时仅采集原始行情,不合成) + kline_synthesis_enabled BOOLEAN NOT NULL DEFAULT TRUE, + + -- 默认 K 线周期 + kline_interval VARCHAR(100) NOT NULL DEFAULT '1m', + + -- K 线合成周期列表(逗号分隔,如 "1m,5m,15m,1h,4h,1d") + kline_intervals VARCHAR(100) NOT NULL DEFAULT '1m,5m,15m,1h,4h,1d', + + -- 历史 K 线最后补全时间(UTC)。默认 Unix epoch 起始, + -- 新交易对从 epoch 起始时间开始全量补拉。 + last_backfill_time TIMESTAMPTZ NOT NULL DEFAULT to_timestamp(0), + + -- 备注 + notes TEXT, + + -- 审计时间戳 + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + + -- 同一交易所下 symbol 唯一 + CONSTRAINT uq_trading_pairs_exchange_symbol UNIQUE (exchange_id, symbol) +); + +-- 按激活状态快速筛选 +CREATE INDEX IF NOT EXISTS idx_trading_pairs_active ON trading_pairs (active); +-- 按交易所+交易对查询(最常用模式) +CREATE INDEX IF NOT EXISTS idx_trading_pairs_exchange_symbol ON trading_pairs (exchange_id, symbol); + + +-- ============================================================ +-- 第三节:1 分钟 K 线 Hypertable(klines) +-- ============================================================ +-- TimescaleDB hypertable,存储交易所推送的 OHLCV 数据。 +-- 写入使用 UPSERT(ON CONFLICT DO UPDATE),已存在的 K 线 +-- 只更新 high/low/close/volume 增量。 +-- +-- TimescaleDB 配置: +-- - chunk_time_interval: 7 days(周分区;1 day→7 days 减少 7× chunk 数) +-- - 列式压缩:7 天后自动执行(压缩率 ~92%) +-- - 压缩分段键:exchange, symbol(同交易对聚合压缩;interval 固定 1m 无需分段) +-- - 压缩排序键:time DESC(查询通常按时间降序) +-- +-- chunk 大小选择指南(16GB / i3-7300U / 1TB SSD): +-- interval chunk 数估算(1000万行) 单 chunk 行数 适用场景 +-- ─────────── ────────────────────── ───────────── ────────────────── +-- 1 day 3200+(过碎 ❌) ~3,000 元数据开销 >> 数据 +-- 7 days ~450(推荐 ✅) ~22,000 查询剪枝 & 管理平衡 +-- 1 month ~100 ~100,000 历史归档为主、写入密集 +-- +-- 已有数据库在线修复(仅影响新 chunk,旧 chunk 需 migrate_chunk): +-- SELECT set_chunk_time_interval('klines', INTERVAL '7 days'); +-- ============================================================ + +CREATE TABLE IF NOT EXISTS klines ( + -- 交易所标识(binance / okx / bybit) + exchange TEXT NOT NULL, + + -- 交易对符号(如 BTCUSDT) + symbol TEXT NOT NULL, + + -- K 线周期(固定 "1m",基表仅存 1 分钟) + interval TEXT NOT NULL, + + -- K 线开盘时间(UTC)— 时间分区键 + time TIMESTAMPTZ NOT NULL, + + -- ============================================================ + -- OHLCV 价格数据 + -- + -- 类型选择:NUMERIC(20,8) vs DOUBLE PRECISION + -- NUMERIC : 精确十进制,无浮点舍入;~10-13 字节/值,CPU 计算慢 + -- DOUBLE : IEEE 754 浮点;固定 8 字节/值,CPU 原生指令,快 3-5× + -- + -- 对于 K 线价格数据,DOUBLE PRECISION 的 15 位有效数字完全够用 + -- (BTC @ $100K 量级精度到 $0.01 仅需 ~7 位有效数字)。 + -- 9 个价格列 × 1000 万行:NUMERIC → DOUBLE 可节省 ~200-400 MB 存储 + -- 并显著加速聚合/窗口函数。新部署强烈建议改为 DOUBLE PRECISION。 + -- ============================================================ + + -- 开盘价 + open NUMERIC(20, 8) NOT NULL, + + -- 最高价 + high NUMERIC(20, 8) NOT NULL, + + -- 最低价 + low NUMERIC(20, 8) NOT NULL, + + -- 收盘价 + close NUMERIC(20, 8) NOT NULL, + + -- 成交量(base 币种) + volume NUMERIC(20, 8) NOT NULL, + + -- ============================================================ + -- 扩展字段(连续聚合时使用 SUM 聚合) + -- ============================================================ + + -- 成交额(quote 币种) + quote_volume NUMERIC(20, 8), + + -- 主动买入成交量(base 币种) + taker_buy_base_vol NUMERIC(20, 8), + + -- 主动买入成交额(quote 币种) + taker_buy_quote_vol NUMERIC(20, 8), + + -- 成交笔数 + trade_count INTEGER, + + -- K 线是否已关闭(true = 该周期 K 线不再变化) + is_closed BOOLEAN NOT NULL DEFAULT TRUE, + + -- 审计时间戳 + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + + -- 复合主键:同一交易所、同一交易对、同一周期、同一时间的 K 线唯一 + PRIMARY KEY (exchange, symbol, interval, time) +); + +-- ============================================================ +-- 将 klines 转换为 TimescaleDB Hypertable +-- ============================================================ + +-- 创建 hypertable,按 time 列做周分区(chunk_time_interval = 7 days) +-- 1 day → 7 days:chunk 数从 ~3200 降至 ~450,消除元数据瓶颈 +SELECT create_hypertable('klines', 'time', + chunk_time_interval => INTERVAL '7 days', + if_not_exists => TRUE +); + +-- ============================================================ +-- 配置列式压缩 +-- ============================================================ + +-- 启用列式压缩(先启用压缩,再设置分段/排序键) +-- 注意:interval 在基表固定为 '1m',从 segmentby 中移除以减少压缩分段数 +ALTER TABLE klines SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'exchange,symbol', + timescaledb.compress_orderby = 'time DESC' +); + +-- 添加压缩策略:30 天前的数据自动压缩 +-- chunk_time_interval=7d + compress_after=30d → 数据写入 ~37 天后被压缩 +-- 选择 30 天的理由: +-- 量化交易中最常见的回测窗口是最近 30 天,保持未压缩可避免解压 CPU 开销。 +-- 30 天后的数据访问频率急剧下降,压缩带来的 IO 减少远超解压开销。 +-- 对于 4 币种:30 天未压缩 ≈ 36 MB(微不足道) +-- 对于 100 币种:30 天未压缩 ≈ 907 MB(仍在 2GB shared_buffers 可接受范围) +SELECT add_compression_policy('klines', + compress_after => INTERVAL '30 days', + if_not_exists => TRUE +); + + +-- ============================================================ +-- 第四节:updated_at 自动刷新触发器 +-- ============================================================ +-- TypeORM 的 @UpdateDateColumn 装饰器在应用层自动更新 updated_at。 +-- 但通过 SQL 直接操作表时(如补数据脚本),需通过触发器确保 +-- updated_at 在每次 UPDATE 时自动刷新到最新时间。 +-- +-- 注意:此触发器仅影响直接 SQL 操作,TypeORM 的 save()/update() +-- 仍由其装饰器控制 updated_at 行为,两层互不冲突。 +-- ============================================================ + +CREATE OR REPLACE FUNCTION update_updated_at_column() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = now(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- 仅在触发器不存在时创建(幂等) +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_trigger + WHERE tgname = 'trg_exchanges_updated_at' + ) THEN + CREATE TRIGGER trg_exchanges_updated_at + BEFORE UPDATE ON exchanges + FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); + END IF; + + IF NOT EXISTS ( + SELECT 1 FROM pg_trigger + WHERE tgname = 'trg_trading_pairs_updated_at' + ) THEN + CREATE TRIGGER trg_trading_pairs_updated_at + BEFORE UPDATE ON trading_pairs + FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); + END IF; + + IF NOT EXISTS ( + SELECT 1 FROM pg_trigger + WHERE tgname = 'trg_klines_updated_at' + ) THEN + CREATE TRIGGER trg_klines_updated_at + BEFORE UPDATE ON klines + FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); + END IF; +END $$; + + +-- ============================================================ +-- 第五节:种子数据(可选) +-- ============================================================ +-- 初始化默认交易所和常用交易对,方便开发环境快速启动。 +-- 生产环境请根据实际需求修改或删除此节。 +-- ============================================================ + +-- 默认交易所(幂等:ON CONFLICT DO NOTHING) +INSERT INTO exchanges (name, label, enabled, config) VALUES + ('binance', 'Binance', TRUE, + '{"rateLimit": 1200, "minOrderSize": 0.001, "feeTaker": 0.001, "feeMaker": 0.001}'::jsonb), + ('okx', 'OKX', TRUE, + '{"rateLimit": 400, "minOrderSize": 0.001, "feeTaker": 0.001, "feeMaker": 0.0008}'::jsonb), + ('bybit', 'Bybit', FALSE, + '{"rateLimit": 600, "minOrderSize": 0.001, "feeTaker": 0.001, "feeMaker": 0.001}'::jsonb) +ON CONFLICT (name) DO NOTHING; + +-- 默认交易对(仅 Binance 主流 USDT 永续合约,幂等) +INSERT INTO trading_pairs (exchange_id, symbol, base_asset, quote_asset, + price_precision, quantity_precision, kline_interval, kline_intervals, active) +SELECT + e.id, + sym.symbol, + sym.base, + sym.quote, + 2, -- price_precision(USDT 计价通常 2 位小数) + 5, -- quantity_precision(数量通常 5 位小数) + '1m', + '1m,5m,15m,30m,1h,4h,1d,1w', + TRUE +FROM exchanges e +CROSS JOIN ( + VALUES + ('BTCUSDT', 'BTC', 'USDT'), + ('ETHUSDT', 'ETH', 'USDT'), + ('BNBUSDT', 'BNB', 'USDT'), + ('SOLUSDT', 'SOL', 'USDT') +) AS sym(symbol, base, quote) +WHERE e.name = 'binance' +ON CONFLICT (exchange_id, symbol) DO NOTHING; diff --git a/data/db/init-db/03-continuous-aggregates.sql b/data/db/init-db/03-continuous-aggregates.sql new file mode 100644 index 0000000..531add1 --- /dev/null +++ b/data/db/init-db/03-continuous-aggregates.sql @@ -0,0 +1,277 @@ +-- ============================================================ +-- 03-continuous-aggregates.sql — K 线分层连续聚合视图 +-- ============================================================ +-- 从 klines(1m)基表创建分层连续聚合物化视图链: +-- 1m → 5m → 15m → 30m → 1h → 4h → 1d → 1w +-- +-- 执行前提: +-- 1. klines hypertable 已创建(由 02-init-tables.sql 创建) +-- 2. klines 表中已有数据(至少一条,否则视图创建成功但无数据) +-- +-- 执行方式: +-- psql -U trader -d trade -f 03-continuous-aggregates.sql +-- +-- 幂等性:使用 IF NOT EXISTS,可重复执行 +-- +-- ============================================================ +-- 聚合刷新模式选择(二选一) +-- ============================================================ +-- 本脚本默认注释掉所有 add_continuous_aggregate_policy 调用。 +-- 你需要根据部署场景选择一种刷新模式: +-- +-- 【模式 A:定时调度刷新】(传统方式,适合简单场景) +-- 取消注释各节的 add_continuous_aggregate_policy 调用即可。 +-- TimescaleDB Job Scheduler 按 schedule_interval 自动刷新。 +-- 缺点:回填期间可能与 INSERT 竞争资源;聚合有调度延迟。 +-- +-- 【模式 B:应用层触发式刷新】(推荐,精细控制) +-- 保持 policy 注释状态。在应用层写入每条 1m K 线后, +-- 检测时间桶是否关闭,若关闭则调用 refresh_continuous_aggregate。 +-- 优点:回填零干扰;聚合零延迟(桶关闭立即刷新);无调度开销。 +-- 应用层代码模板见 04-backfill-workflow.sql 末尾。 +-- +-- 回填工作流(两种模式通用): +-- 1. 批量 INSERT 历史 K 线(policy 已注释,不冲突) +-- 2. 手动全量刷新所有视图(见文件末尾注释块) +-- 3. 接入实时数据(模式 A 启用 policy / 模式 B 应用层触发) +-- ============================================================ + +-- ============================================================ +-- 5m K 线(从 1m 基表聚合) +-- ============================================================ +CREATE MATERIALIZED VIEW IF NOT EXISTS klines_5m +WITH (timescaledb.continuous) AS +SELECT + time_bucket('5 minutes', time) AS time, + exchange, + symbol, + '5m'::text AS interval, + FIRST(open, time) AS open, + MAX(high) AS high, + MIN(low) AS low, + LAST(close, time) AS close, + SUM(volume) AS volume, + SUM(quote_volume) AS quote_volume, + SUM(taker_buy_base_vol) AS taker_buy_base_vol, + SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, + SUM(trade_count)::integer AS trade_count +FROM klines +GROUP BY time_bucket('5 minutes', klines.time), exchange, symbol +WITH NO DATA; + +-- 【模式 A 用户】取消下面注释以启用定时调度刷新 +-- SELECT add_continuous_aggregate_policy('klines_5m', +-- start_offset => INTERVAL '1 day', +-- end_offset => INTERVAL '5 minutes', +-- schedule_interval => INTERVAL '5 minutes', +-- if_not_exists => TRUE +-- ); + +-- ============================================================ +-- 15m K 线(从 5m 聚合,分层链) +-- ============================================================ +CREATE MATERIALIZED VIEW IF NOT EXISTS klines_15m +WITH (timescaledb.continuous) AS +SELECT + time_bucket('15 minutes', time) AS time, + exchange, + symbol, + '15m'::text AS interval, + FIRST(open, time) AS open, + MAX(high) AS high, + MIN(low) AS low, + LAST(close, time) AS close, + SUM(volume) AS volume, + SUM(quote_volume) AS quote_volume, + SUM(taker_buy_base_vol) AS taker_buy_base_vol, + SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, + SUM(trade_count)::integer AS trade_count +FROM klines_5m +GROUP BY time_bucket('15 minutes', klines_5m.time), exchange, symbol +WITH NO DATA; + +-- 【模式 A 用户】取消下面注释以启用定时调度刷新 +-- SELECT add_continuous_aggregate_policy('klines_15m', +-- start_offset => INTERVAL '2 days', +-- end_offset => INTERVAL '15 minutes', +-- schedule_interval => INTERVAL '15 minutes', +-- if_not_exists => TRUE +-- ); + +-- ============================================================ +-- 30m K 线(从 15m 聚合,分层链) +-- ============================================================ +CREATE MATERIALIZED VIEW IF NOT EXISTS klines_30m +WITH (timescaledb.continuous) AS +SELECT + time_bucket('30 minutes', time) AS time, + exchange, + symbol, + '30m'::text AS interval, + FIRST(open, time) AS open, + MAX(high) AS high, + MIN(low) AS low, + LAST(close, time) AS close, + SUM(volume) AS volume, + SUM(quote_volume) AS quote_volume, + SUM(taker_buy_base_vol) AS taker_buy_base_vol, + SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, + SUM(trade_count)::integer AS trade_count +FROM klines_15m +GROUP BY time_bucket('30 minutes', klines_15m.time), exchange, symbol +WITH NO DATA; + +-- 【模式 A 用户】取消下面注释以启用定时调度刷新 +-- SELECT add_continuous_aggregate_policy('klines_30m', +-- start_offset => INTERVAL '3 days', +-- end_offset => INTERVAL '30 minutes', +-- schedule_interval => INTERVAL '30 minutes', +-- if_not_exists => TRUE +-- ); + +-- ============================================================ +-- 1h K 线(从 30m 聚合,分层链) +-- ============================================================ +CREATE MATERIALIZED VIEW IF NOT EXISTS klines_1h +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 hour', time) AS time, + exchange, + symbol, + '1h'::text AS interval, + FIRST(open, time) AS open, + MAX(high) AS high, + MIN(low) AS low, + LAST(close, time) AS close, + SUM(volume) AS volume, + SUM(quote_volume) AS quote_volume, + SUM(taker_buy_base_vol) AS taker_buy_base_vol, + SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, + SUM(trade_count)::integer AS trade_count +FROM klines_30m +GROUP BY time_bucket('1 hour', klines_30m.time), exchange, symbol +WITH NO DATA; + +-- 【模式 A 用户】取消下面注释以启用定时调度刷新 +-- SELECT add_continuous_aggregate_policy('klines_1h', +-- start_offset => INTERVAL '7 days', +-- end_offset => INTERVAL '1 hour', +-- schedule_interval => INTERVAL '1 hour', +-- if_not_exists => TRUE +-- ); + +-- ============================================================ +-- 4h K 线(从 1h 聚合,分层链) +-- ============================================================ +CREATE MATERIALIZED VIEW IF NOT EXISTS klines_4h +WITH (timescaledb.continuous) AS +SELECT + time_bucket('4 hours', time) AS time, + exchange, + symbol, + '4h'::text AS interval, + FIRST(open, time) AS open, + MAX(high) AS high, + MIN(low) AS low, + LAST(close, time) AS close, + SUM(volume) AS volume, + SUM(quote_volume) AS quote_volume, + SUM(taker_buy_base_vol) AS taker_buy_base_vol, + SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, + SUM(trade_count)::integer AS trade_count +FROM klines_1h +GROUP BY time_bucket('4 hours', klines_1h.time), exchange, symbol +WITH NO DATA; + +-- 【模式 A 用户】取消下面注释以启用定时调度刷新 +-- SELECT add_continuous_aggregate_policy('klines_4h', +-- start_offset => INTERVAL '14 days', +-- end_offset => INTERVAL '4 hours', +-- schedule_interval => INTERVAL '4 hours', +-- if_not_exists => TRUE +-- ); + +-- ============================================================ +-- 1d K 线(从 4h 聚合,分层链) +-- ============================================================ +CREATE MATERIALIZED VIEW IF NOT EXISTS klines_1d +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 day', time) AS time, + exchange, + symbol, + '1d'::text AS interval, + FIRST(open, time) AS open, + MAX(high) AS high, + MIN(low) AS low, + LAST(close, time) AS close, + SUM(volume) AS volume, + SUM(quote_volume) AS quote_volume, + SUM(taker_buy_base_vol) AS taker_buy_base_vol, + SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, + SUM(trade_count)::integer AS trade_count +FROM klines_4h +GROUP BY time_bucket('1 day', klines_4h.time), exchange, symbol +WITH NO DATA; + +-- 【模式 A 用户】取消下面注释以启用定时调度刷新 +-- SELECT add_continuous_aggregate_policy('klines_1d', +-- start_offset => INTERVAL '30 days', +-- end_offset => INTERVAL '1 day', +-- schedule_interval => INTERVAL '1 day', +-- if_not_exists => TRUE +-- ); + +-- ============================================================ +-- 1w K 线(从 1d 聚合,分层链) +-- ============================================================ +CREATE MATERIALIZED VIEW IF NOT EXISTS klines_1w +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 week', time) AS time, + exchange, + symbol, + '1w'::text AS interval, + FIRST(open, time) AS open, + MAX(high) AS high, + MIN(low) AS low, + LAST(close, time) AS close, + SUM(volume) AS volume, + SUM(quote_volume) AS quote_volume, + SUM(taker_buy_base_vol) AS taker_buy_base_vol, + SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, + SUM(trade_count)::integer AS trade_count +FROM klines_1d +GROUP BY time_bucket('1 week', klines_1d.time), exchange, symbol +WITH NO DATA; + +-- 【模式 A 用户】取消下面注释以启用定时调度刷新 +-- SELECT add_continuous_aggregate_policy('klines_1w', +-- start_offset => INTERVAL '90 days', +-- end_offset => INTERVAL '1 day', +-- schedule_interval => INTERVAL '1 day', +-- if_not_exists => TRUE +-- ); + +-- ============================================================ +-- 推荐索引:加速按 symbol + time 的查询 +-- ============================================================ +CREATE INDEX IF NOT EXISTS idx_klines_5m_symbol_time ON klines_5m (exchange, symbol, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_15m_symbol_time ON klines_15m (exchange, symbol, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_30m_symbol_time ON klines_30m (exchange, symbol, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_1h_symbol_time ON klines_1h (exchange, symbol, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_4h_symbol_time ON klines_4h (exchange, symbol, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_1d_symbol_time ON klines_1d (exchange, symbol, time DESC); +CREATE INDEX IF NOT EXISTS idx_klines_1w_symbol_time ON klines_1w (exchange, symbol, time DESC); + +-- ============================================================ +-- 首次创建后手动刷新所有视图(填充历史数据) +-- 取消注释以下行执行: +-- ============================================================ +-- CALL refresh_continuous_aggregate('klines_5m', NULL, NULL); +-- CALL refresh_continuous_aggregate('klines_15m', NULL, NULL); +-- CALL refresh_continuous_aggregate('klines_30m', NULL, NULL); +-- CALL refresh_continuous_aggregate('klines_1h', NULL, NULL); +-- CALL refresh_continuous_aggregate('klines_4h', NULL, NULL); +-- CALL refresh_continuous_aggregate('klines_1d', NULL, NULL); +-- CALL refresh_continuous_aggregate('klines_1w', NULL, NULL);