-- ============================================================ -- 001_init.sql — TimescaleDB 数据初始化 -- -- Docker Compose 首次启动时自动执行 -- 挂载路径:./data/init-db:/docker-entrypoint-initdb.d -- ============================================================ -- 扩展 CREATE EXTENSION IF NOT EXISTS timescaledb; CREATE EXTENSION IF NOT EXISTS timescaledb_toolkit; -- ============================================================ -- 1. K 线主表 -- ============================================================ CREATE TABLE IF NOT EXISTS klines ( -- 时间维度 time TIMESTAMPTZ NOT NULL, -- K 线开盘时间(UTC) -- 标识维度 exchange TEXT NOT NULL, -- 交易所:binance/okx/bybit symbol TEXT NOT NULL, -- 交易对:BTCUSDT/ETHUSDT interval TEXT NOT NULL, -- 周期:1m/5m/15m/1h/4h/1d -- OHLCV open NUMERIC(20,8) NOT NULL, high NUMERIC(20,8) NOT NULL, low NUMERIC(20,8) NOT NULL, close NUMERIC(20,8) NOT NULL, volume NUMERIC(20,8) NOT NULL DEFAULT 0, -- 成交量(基准币种) -- 扩展字段 quote_volume NUMERIC(20,8) DEFAULT 0, -- 成交额(计价币种) taker_buy_base_vol NUMERIC(20,8) DEFAULT 0, -- 主动买入量 taker_buy_quote_vol NUMERIC(20,8) DEFAULT 0, -- 主动买入额 trade_count INTEGER DEFAULT 0, -- 成交笔数 is_closed BOOLEAN DEFAULT TRUE, -- K 线是否已闭合 -- 元数据 created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW(), -- 唯一约束(同一根 K 线不可重复) UNIQUE (time, exchange, symbol, interval) ); -- ============================================================ -- 2. 转换为 hypertable(时序分区) -- ============================================================ SELECT create_hypertable( 'klines', 'time', -- 时间列 chunk_time_interval => INTERVAL '1 day', -- 每个 chunk = 1 天数据 partitioning_column => 'exchange', -- 空间分区列 number_partitions => 4, -- 4 个空间分区 if_not_exists => TRUE ); -- ============================================================ -- 3. 索引 -- ============================================================ -- 主力查询索引:按交易对+周期+时间范围查(覆盖 95% 查询) CREATE INDEX IF NOT EXISTS idx_klines_lookup ON klines (exchange, symbol, interval, time DESC); -- 回测专用索引:按交易对+周期+时间正序 CREATE INDEX IF NOT EXISTS idx_klines_backtest ON klines (symbol, interval, time ASC); -- 最新 K 线索引(部分索引,仅覆盖已闭合 K 线) CREATE INDEX IF NOT EXISTS idx_klines_latest ON klines (exchange, symbol, interval, time DESC) WHERE is_closed = TRUE; -- ============================================================ -- 4. 压缩策略 -- ============================================================ -- 启用列式压缩(按 symbol+interval 分组,按 time 排序) ALTER TABLE klines SET ( timescaledb.compress, timescaledb.compress_segmentby = 'exchange, symbol, interval', timescaledb.compress_orderby = 'time DESC' ); -- 自动压缩:K 线闭合 7 天后自动压缩(压缩比约 90%) SELECT add_compression_policy('klines', INTERVAL '7 days', if_not_exists => TRUE); -- ============================================================ -- 5. 数据保留策略 -- ============================================================ -- 1m K 线保留 90 天(回测通常用更粗粒度) SELECT add_retention_policy('klines', INTERVAL '90 days', if_not_exists => TRUE); -- ============================================================ -- 6. 连续聚合(从 1m 自动派生高周期 K 线) -- ============================================================ -- ---------- 5m K 线 ---------- CREATE MATERIALIZED VIEW IF NOT EXISTS klines_5m WITH (timescaledb.continuous) AS SELECT time_bucket('5 minutes', time) AS time, exchange, symbol, '5m'::TEXT AS interval, FIRST(open, time) AS open, MAX(high) AS high, MIN(low) AS low, LAST(close, time) AS close, SUM(volume) AS volume, SUM(quote_volume) AS quote_volume, SUM(taker_buy_base_vol) AS taker_buy_base_vol, SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, SUM(trade_count) AS trade_count FROM klines WHERE interval = '1m' GROUP BY time_bucket('5 minutes', time), exchange, symbol; SELECT add_continuous_aggregate_policy('klines_5m', start_offset => INTERVAL '1 day', end_offset => INTERVAL '10 minutes', schedule_interval => INTERVAL '1 minute', if_not_exists => TRUE ); -- ---------- 15m K 线 ---------- CREATE MATERIALIZED VIEW IF NOT EXISTS klines_15m WITH (timescaledb.continuous) AS SELECT time_bucket('15 minutes', time) AS time, exchange, symbol, '15m'::TEXT AS interval, FIRST(open, time) AS open, MAX(high) AS high, MIN(low) AS low, LAST(close, time) AS close, SUM(volume) AS volume, SUM(quote_volume) AS quote_volume, SUM(taker_buy_base_vol) AS taker_buy_base_vol, SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, SUM(trade_count) AS trade_count FROM klines WHERE interval = '1m' GROUP BY time_bucket('15 minutes', time), exchange, symbol; SELECT add_continuous_aggregate_policy('klines_15m', start_offset => INTERVAL '2 days', end_offset => INTERVAL '30 minutes', schedule_interval => INTERVAL '5 minutes', if_not_exists => TRUE ); -- ---------- 1h K 线 ---------- CREATE MATERIALIZED VIEW IF NOT EXISTS klines_1h WITH (timescaledb.continuous) AS SELECT time_bucket('1 hour', time) AS time, exchange, symbol, '1h'::TEXT AS interval, FIRST(open, time) AS open, MAX(high) AS high, MIN(low) AS low, LAST(close, time) AS close, SUM(volume) AS volume, SUM(quote_volume) AS quote_volume, SUM(taker_buy_base_vol) AS taker_buy_base_vol, SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, SUM(trade_count) AS trade_count FROM klines WHERE interval = '1m' GROUP BY time_bucket('1 hour', time), exchange, symbol; SELECT add_continuous_aggregate_policy('klines_1h', start_offset => INTERVAL '3 days', end_offset => INTERVAL '1 hour', schedule_interval => INTERVAL '5 minutes', if_not_exists => TRUE ); -- ---------- 1d K 线 ---------- CREATE MATERIALIZED VIEW IF NOT EXISTS klines_1d WITH (timescaledb.continuous) AS SELECT time_bucket('1 day', time) AS time, exchange, symbol, '1d'::TEXT AS interval, FIRST(open, time) AS open, MAX(high) AS high, MIN(low) AS low, LAST(close, time) AS close, SUM(volume) AS volume, SUM(quote_volume) AS quote_volume, SUM(taker_buy_base_vol) AS taker_buy_base_vol, SUM(taker_buy_quote_vol) AS taker_buy_quote_vol, SUM(trade_count) AS trade_count FROM klines WHERE interval = '1m' GROUP BY time_bucket('1 day', time), exchange, symbol; SELECT add_continuous_aggregate_policy('klines_1d', start_offset => INTERVAL '7 days', end_offset => INTERVAL '2 hours', schedule_interval => INTERVAL '1 hour', if_not_exists => TRUE ); -- ============================================================ -- 7. 连续聚合的压缩(减少视图存储) -- ============================================================ ALTER MATERIALIZED VIEW klines_5m SET (timescaledb.compress = true); ALTER MATERIALIZED VIEW klines_15m SET (timescaledb.compress = true); ALTER MATERIALIZED VIEW klines_1h SET (timescaledb.compress = true); ALTER MATERIALIZED VIEW klines_1d SET (timescaledb.compress = true); -- ============================================================ -- 初始化完成 -- ============================================================ DO $$ BEGIN RAISE NOTICE 'TimescaleDB initialization complete.'; RAISE NOTICE 'Hypertable: klines'; RAISE NOTICE 'Continuous aggregates: klines_5m, klines_15m, klines_1h, klines_1d'; RAISE NOTICE 'Compression: 7 days delay, 90 days retention'; END $$;