diff --git a/data/ARCHITECTURE.md b/data/ARCHITECTURE.md index 1566dbc..22cb72b 100644 --- a/data/ARCHITECTURE.md +++ b/data/ARCHITECTURE.md @@ -18,7 +18,8 @@ - [4.4 关系数据实体 (`db/entities/`)](#44-关系数据实体-dbentities) - [4.5 交易所适配器 (`exchanges/`)](#45-交易所适配器-exchanges) - [4.6 K 线合成管道 (`pipeline/`)](#46-k-线合成管道-pipeline) - - [4.7 数据发布 (`publisher/`)](#47-数据发布-publisher) + - [4.7 数据发布 (`publisher/`)](#47-数据发布-publisher) + - [4.8 数据补全服务 (`run/exchange.ts`)](#48-数据补全服务-runexchangets) 5. [数据流生命周期](#5-数据流生命周期) 6. [TypeORM + TimescaleDB 集成细节](#6-typeorm--timescaledb-集成细节) 7. [配置管理策略](#7-配置管理策略) @@ -138,6 +139,10 @@ data/ ├── tsconfig.json # TypeScript 配置 ├── bun.lock # Bun 依赖锁定文件 │ +├── run/ # 启动文件 +│ ├── main.ts # 模块入口:配置加载 → DB 连接 → Redis 连接 → 适配器启动 → 优雅关闭 +│ └── exchange.ts # 数据补全服务:读取 trading_pairs.last_backfill_time → 拉取缺失 K 线 → 批量写入 → 更新时间戳 +│ ├── config/ # 中心化配置模块(目录) │ ├── index.ts # 配置加载与分组导出(pgsql / redis / logging) │ └── validators.ts # 零依赖运行时校验(env.yaml → EnvConfig) @@ -159,8 +164,6 @@ data/ ├── publisher/ # Redis 数据发布(待实现) ├── types/ # 共享类型定义 ├── utils/ # 工具函数 -├── index.ts # 模块入口 -├── logger.ts # Pino 日志实例 └── tests/ # 测试 ``` @@ -849,6 +852,112 @@ export class RedisPublisher { --- +### 4.8 数据补全服务 [`run/exchange.ts`](run/exchange.ts) + +独立启动的数据补全服务,从 `trading_pairs` 表中读取每个交易对的 `last_backfill_time`,据此确定需要拉取的历史 K 线范围,补全完成后将 `last_backfill_time` 更新为最新时间点。 + +**核心机制 — 基于 `last_backfill_time` 的增量补全**: + +``` +trading_pairs 表: + +┌────┬──────────┬──────────────────────┬──────────────────────┐ +│ id │ symbol │ last_backfill_time │ kline_intervals │ +├────┼──────────┼──────────────────────┼──────────────────────┤ +│ 1 │ BTCUSDT │ 2026-06-07 12:00:00 │ 1m,5m,15m,1h,4h,1d │ +│ 2 │ ETHUSDT │ 2026-06-08 08:00:00 │ 1m,5m,1h,1d │ +└────┴──────────┴──────────────────────┴──────────────────────┘ + +补全任务生成: + +BTCUSDT → [(1m, 06/07 12:00 → now), (5m, 06/07 12:00 → now), ...] +ETHUSDT → [(1m, 06/08 08:00 → now), (5m, 06/08 08:00 → now), ...] +``` + +- `last_backfill_time` 初始值为 `1970-01-01T00:00:00Z`(epoch 起点),新交易对自动触发全量拉取 +- 每次补全完成后,更新为本次实际拉取到的最后一条 K 线时间 +- 下次运行时自动从上次结束位置继续,无重复拉取 + +**使用场景**: + +| 场景 | 触发方式 | 说明 | +|------|----------|------| +| **定期增量补全** | cron 定时触发 | 每日/每小时补齐最新数据 | +| **首次上线初始化** | 手动执行 | 新交易对 `last_backfill_time` 为 epoch,自动拉全量历史 | +| **定点修复** | `--start` / `--end` 覆盖 | 修复特定时间段的缺失数据 | +| **补全后验证** | `--dry-run` | 仅展示需拉取的任务范围,不实际请求 | + +**命令行参数**: + +```bash +# 全量模式:为所有 active 交易对执行增量补全 +bun run run/exchange.ts --concurrency 2 + +# 指定交易对: +bun run run/exchange.ts --symbols BTCUSDT,ETHUSDT + +# 手动覆盖时间范围(忽略 last_backfill_time): +bun run run/exchange.ts \ + --symbols BTCUSDT \ + --start "2026-06-01T00:00:00Z" \ + --end "2026-06-08T00:00:00Z" + +# 仅检测不拉取: +bun run run/exchange.ts --dry-run +``` + +| 参数 | 默认值 | 说明 | +|------|--------|------| +| `--exchange` | (从 DB 读取) | 限定交易所,不填则为所有启用交易所 | +| `--symbols` | (从 DB 读取所有 active) | 限定交易对列表,逗号分隔 | +| `--intervals` | (从 DB 读取 `kline_intervals`) | K 线周期,逗号分隔 | +| `--start` | `last_backfill_time`(不低于 7 天前) | 补全起始时间 (ISO 格式);不填则使用 DB 中的 `last_backfill_time` | +| `--end` | `Date.now()` | 补全结束时间 (ISO 格式) | +| `--concurrency` | `2` | 并发任务数 | +| `--batch-size` | `500` | 单次 REST 请求最大 K 线条数 | +| `--dry-run` | `false` | 仅列出任务范围,不拉取不写入 | + +**执行流程**: + +``` +1. 查询 trading_pairs 表(JOIN exchanges),获取 active=true 且 exchange.enabled=true 的交易对 +2. 为每个交易对 × 每个 kline_interval 生成一个 BackfillTask: + - startTime = --start ?? last_backfill_time(若 last_backfill_time 为 epoch 则兜底为 now-7d) + - endTime = --end ?? now + - 若 startTime >= endTime → 跳过(已是最新) +3. 按 exchange 分组,创建对应适配器实例 +4. Semaphore 并发执行(默认 2): + a. 按 batch-size 分段切分时间范围 + b. 逐段调用适配器 fetchKlines() → 写入 klines 表(UPSERT) + c. 记录本次拉取的最后一条 K 线时间 +5. 所有任务完成后,更新每个交易对的 last_backfill_time +``` + +**并发策略**: + +- 不同(symbol, interval)任务之间并行执行 +- 同一任务内部的多次分页请求串行执行(受 REST API 限频约束) +- 单个任务失败不影响其他任务,失败数记录到最终统计 + +**last_backfill_time 更新逻辑**: + +``` +任务完成后: + pairLastTimes[pairId] = max(pairLastTimes[pairId], 本次拉取最后一条 K 线的 openTime) + +最后统一: + UPDATE trading_pairs SET last_backfill_time = pairLastTimes[id] +``` + +**注意事项**: + +- 未指定 `--symbols` 且非 `--dry-run` 时,走"全量增量"模式,覆盖所有 active 交易对 +- 指定 `--start` 时不使用 `last_backfill_time`,但仍会更新 `last_backfill_time` 为实际拉取时间 +- `--dry-run` 模式下不更新 `last_backfill_time` +- 依赖交易所 REST API 限频,当前硬编码每次分页间隔 200ms(Binance) + +--- + ## 5. 数据流生命周期 ``` @@ -898,6 +1007,8 @@ export class RedisPublisher { ### 启动时序 +入口文件:[`run/main.ts`](run/main.ts) + ``` 1. 加载配置(config/index.ts → 读取 env.yaml → 零依赖校验) 2. 初始化 Pino 日志 @@ -1124,7 +1235,7 @@ export const logger = pino({ ### 9.2 优雅关闭(Graceful Shutdown) ```typescript -// data/index.ts +// data/run/main.ts async function shutdown(signal: string): Promise { logger.info({ signal }, "Shutting down"); @@ -1226,8 +1337,8 @@ cd data && bun install # 3. 配置环境 # 编辑项目根目录 env.yaml(如不存在则创建) -# 4. 验证配置加载 -bun run db/data-source.ts # 测试 DataSource 初始化 +# 4. 启动数据模块 +bun run run/main.ts # 5. 运行测试 bun test @@ -1269,6 +1380,29 @@ bunx typeorm migration:revert -d db/data-source.ts | 类型/接口 | `PascalCase` | | 测试文件 | `*.test.ts`(与源文件同目录或 `tests/` 下镜像结构) | +### 11.5 数据补全 + +```bash +# 增量模式:为所有 active 交易对补齐最新数据 +bun run run/exchange.ts --concurrency 2 + +# 仅检测需补全的任务范围 +bun run run/exchange.ts --dry-run + +# 指定交易对增量补全 +bun run run/exchange.ts --symbols BTCUSDT,ETHUSDT + +# 首次上线:拉取最近 90 天全量历史 +bun run run/exchange.ts \ + --symbols BTCUSDT \ + --start "$(date -u -v-90d '+%Y-%m-%dT%H:%M:%SZ')" \ + --intervals 1m,5m,15m,1h,4h,1d \ + --concurrency 1 + +# cron 定时任务(每小时执行) +0 * * * * cd /app && bun run data/run/exchange.ts --concurrency 2 >> /var/log/backfill.log 2>&1 +``` + --- ## 12. 风险提示 diff --git a/data/bun.lock b/data/bun.lock index c14f850..172d559 100644 --- a/data/bun.lock +++ b/data/bun.lock @@ -10,6 +10,7 @@ "ioredis": "^5.11.1", "pg": "^8.21.0", "pino": "^10.3.1", + "pino-pretty": "^13.1.3", "rxjs": "^7.8.2", "typeorm": "^1.0.0", "yaml": "^2.9.0", @@ -350,6 +351,8 @@ "cross-spawn": ["cross-spawn@7.0.6", "", { "dependencies": { "path-key": "^3.1.0", "shebang-command": "^2.0.0", "which": "^2.0.1" } }, "sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA=="], + "dateformat": ["dateformat@4.6.3", "", {}, "sha512-2P0p0pFGzHS5EMnhdxQi7aJN+iMheud0UhG4dlE1DLAlvL8JHjJJTX/CSm4JXwV0Ka5nGk3zC5mcb5bUQUxxMA=="], + "dayjs": ["dayjs@1.11.21", "", {}, "sha512-98IT+HOahAisibz/yjKbzuOBwYcjJ7BCLPzARyHiyEBmRz4fatF+KPJszEHXsGYjUG234aH/cOjW1wwTbKUZlA=="], "debug": ["debug@4.4.3", "", { "dependencies": { "ms": "^2.1.3" } }, "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA=="], @@ -378,6 +381,8 @@ "emojis-list": ["emojis-list@3.0.0", "", {}, "sha512-/kyM18EfinwXZbno9FyUGeFh87KC8HRQBQGildHZbEuRyWFOmv1U10o9BBp8XVZDVNNuQKyIGIu5ZYAAXJ0V2Q=="], + "end-of-stream": ["end-of-stream@1.4.5", "", { "dependencies": { "once": "^1.4.0" } }, "sha512-ooEGc6HP26xXq/N+GCGOT0JKCLDGrq2bQUZrQ7gyrJiZANJ/8YDTxTpQBXGMn+WbIQXNVpyWymm7KYVICQnyOg=="], + "enhanced-resolve": ["enhanced-resolve@4.5.0", "", { "dependencies": { "graceful-fs": "^4.1.2", "memory-fs": "^0.5.0", "tapable": "^1.0.0" } }, "sha512-Nv9m36S/vxpsI+Hc4/ZGRs0n9mXqSWGGq49zxb/cJfPAQMbUtttJAlNPS4AQzaBdw/pKskw5bMbekT/Y7W/Wlg=="], "envinfo": ["envinfo@7.21.0", "", { "bin": { "envinfo": "dist/cli.js" } }, "sha512-Lw7I8Zp5YKHFCXL7+Dz95g4CcbMEpgvqZNNq3AmlT5XAV6CgAAk6gyAMqn2zjw08K9BHfcNuKrMiCPLByGafow=="], @@ -422,12 +427,16 @@ "expect-type": ["expect-type@1.3.0", "", {}, "sha512-knvyeauYhqjOYvQ66MznSMs83wmHrCycNEN6Ao+2AeYEfxUIkuiVxdEa1qlGEPK+We3n0THiDciYSsCcgW/DoA=="], + "fast-copy": ["fast-copy@4.0.3", "", {}, "sha512-58apWr0GUiDFM8+3afrO6eYwJBn9ZAhDOzG3L+/9llab/haCARS2UIfffmOurYLwbgDRs8n0rfr6qAAPEAuAQw=="], + "fast-deep-equal": ["fast-deep-equal@3.1.3", "", {}, "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q=="], "fast-json-stable-stringify": ["fast-json-stable-stringify@2.1.0", "", {}, "sha512-lhd/wF+Lk98HZoTCtlVraHtfh5XYijIjalXck7saUtuanSDyLMxnHhSXEDJqHxD7msR8D0uCmqlkwjCV8xvwHw=="], "fast-levenshtein": ["fast-levenshtein@2.0.6", "", {}, "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw=="], + "fast-safe-stringify": ["fast-safe-stringify@2.1.1", "", {}, "sha512-W+KJc2dmILlPplD/H4K9l9LcAHAfPtP6BY84uVLXQ6Evcz9Lcg33Y2z1IVblT6xdY54PXYVHEv+0Wpq8Io6zkA=="], + "fast-uri": ["fast-uri@3.1.2", "", {}, "sha512-rVjf7ArG3LTk+FS6Yw81V1DLuZl1bRbNrev6Tmd/9RaroeeRRJhAt7jg/6YFxbvAQXUCavSoZhPPj6oOx+5KjQ=="], "fastest-levenshtein": ["fastest-levenshtein@1.0.16", "", {}, "sha512-eRnCtTTtGZFpQCwhJiUOuxPQWRXVKYDn0b2PeHfXL6/Zi53SLAzAHfVhVWK2AryC/WH05kGfxhFIPvTF0SXQzg=="], @@ -486,6 +495,8 @@ "hasown": ["hasown@2.0.4", "", { "dependencies": { "function-bind": "^1.1.2" } }, "sha512-T2UbfbBEF32wiepXIsMlTW9+dDYC6wMh/t/vYA4tuOMKqWz/n3vr1NFSxQiyP+zk2mXsoMA/i/7qV6LKut1t1A=="], + "help-me": ["help-me@5.0.0", "", {}, "sha512-7xgomUX6ADmcYzFik0HzAxh/73YlKR9bmFzf51CZwR+b6YtzU2m0u49hQCqV6SvlqIqsaxovfwdvbnsw3b/zpg=="], + "html-escaper": ["html-escaper@3.0.3", "", {}, "sha512-RuMffC89BOWQoY0WKGpIhn5gX3iI54O6nRA0yC124NYVtzjmFWBIiFd8M0x+ZdX0P9R4lADg1mgP8C7PxGOWuQ=="], "https-proxy-agent": ["https-proxy-agent@5.0.1", "", { "dependencies": { "agent-base": "6", "debug": "4" } }, "sha512-dFcAjpTQFgoLMzC2VwU+C/CbS7uRL0lWmxDITmqm7C+7F0Odmj6s9l6alZc6AELXhrnggM2CeWSXHGOdX2YtwA=="], @@ -534,6 +545,8 @@ "jest-worker": ["jest-worker@27.5.1", "", { "dependencies": { "@types/node": "*", "merge-stream": "^2.0.0", "supports-color": "^8.0.0" } }, "sha512-7vuh85V5cdDofPyxn58nrPjBktZo0u9x1g8WtjQol+jZDaE+fhN+cIvTj11GndBnMnyfrUOG1sZQxCdjKh+DKg=="], + "joycon": ["joycon@3.1.1", "", {}, "sha512-34wB/Y7MW7bzjKRjUKTa46I2Z7eV62Rkhva+KkopW7Qvv/OSWBqvkSY7vusOPrNuZcUG3tApvdVgNB8POj3SPw=="], + "json-buffer": ["json-buffer@3.0.1", "", {}, "sha512-4bV5BfR2mqfQTJm+V5tPPdf+ZpuhiIvTuAB5g8kcrXOZpTT/QwwVRWBywX1ozr6lEuPdbHxwaJlm9G6mI2sfSQ=="], "json-schema-traverse": ["json-schema-traverse@0.4.1", "", {}, "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg=="], @@ -596,6 +609,8 @@ "minimatch": ["minimatch@10.2.5", "", { "dependencies": { "brace-expansion": "^5.0.5" } }, "sha512-MULkVLfKGYDFYejP07QOurDLLQpcjk7Fw+7jXS2R2czRQzR56yHRveU5NDJEOviH+hETZKSkIk5c+T23GjFUMg=="], + "minimist": ["minimist@1.2.8", "", {}, "sha512-2yyAR8qBkN3YuheJanUpWC5U3bb5osDywNB8RzDVlDwDHbocAJveqqj1u8+SVD7jkWT4yvsHCpWqqWqAxb0zCA=="], + "minipass": ["minipass@7.1.3", "", {}, "sha512-tEBHqDnIoM/1rXME1zgka9g6Q2lcoCkxHLuc7ODJ5BxbP5d4c2Z5cGgtXAku59200Cx7diuHTOYfSBD8n6mm8A=="], "mrmime": ["mrmime@2.0.1", "", {}, "sha512-Y3wQdFg2Va6etvQ5I82yUhGdsKrcYox6p7FfL1LbK2J4V01F9TGlepTIhnK24t7koZibmg82KGglhA1XK5IsLQ=="], @@ -614,6 +629,8 @@ "on-exit-leak-free": ["on-exit-leak-free@2.1.2", "", {}, "sha512-0eJJY6hXLGf1udHwfNftBqH+g73EU4B504nZeKpz1sYRKafAghwxEJunB2O7rDZkL4PGfsMVnTXZ2EjibbqcsA=="], + "once": ["once@1.4.0", "", { "dependencies": { "wrappy": "1" } }, "sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w=="], + "opener": ["opener@1.5.2", "", { "bin": { "opener": "bin/opener-bin.js" } }, "sha512-ur5UIdyw5Y7yEj9wLzhqXiy6GZ3Mwx0yGI+5sMn2r0N0v3cKJvUmFH5yPP+WXh9e0xfyzyJX95D8l088DNFj7A=="], "optionator": ["optionator@0.9.4", "", { "dependencies": { "deep-is": "^0.1.3", "fast-levenshtein": "^2.0.6", "levn": "^0.4.1", "prelude-ls": "^1.2.1", "type-check": "^0.4.0", "word-wrap": "^1.2.5" } }, "sha512-6IpQ7mKUxRcZNLIObR0hz7lxsapSSIYNZJwXPGeF0mTVqGKFIXj1DQcMoT22S3ROcLyY/rz0PWaWZ9ayWmad9g=="], @@ -660,6 +677,8 @@ "pino-abstract-transport": ["pino-abstract-transport@3.0.0", "", { "dependencies": { "split2": "^4.0.0" } }, "sha512-wlfUczU+n7Hy/Ha5j9a/gZNy7We5+cXp8YL+X+PG8S0KXxw7n/JXA3c46Y0zQznIJ83URJiwy7Lh56WLokNuxg=="], + "pino-pretty": ["pino-pretty@13.1.3", "", { "dependencies": { "colorette": "^2.0.7", "dateformat": "^4.6.3", "fast-copy": "^4.0.0", "fast-safe-stringify": "^2.1.1", "help-me": "^5.0.0", "joycon": "^3.1.1", "minimist": "^1.2.6", "on-exit-leak-free": "^2.1.0", "pino-abstract-transport": "^3.0.0", "pump": "^3.0.0", "secure-json-parse": "^4.0.0", "sonic-boom": "^4.0.1", "strip-json-comments": "^5.0.2" }, "bin": { "pino-pretty": "bin.js" } }, "sha512-ttXRkkOz6WWC95KeY9+xxWL6AtImwbyMHrL1mSwqwW9u+vLp/WIElvHvCSDg0xO/Dzrggz1zv3rN5ovTRVowKg=="], + "pino-std-serializers": ["pino-std-serializers@7.1.0", "", {}, "sha512-BndPH67/JxGExRgiX1dX0w1FvZck5Wa4aal9198SrRhZjH3GxKQUKIBnYJTdj2HDN3UQAS06HlfcSbQj2OHmaw=="], "pkg-dir": ["pkg-dir@4.2.0", "", { "dependencies": { "find-up": "^4.0.0" } }, "sha512-HRDzbaKjC+AOWVXxAU/x54COGeIv9eb+6CkDSQoNTt4XyWoIJvuPsXizxu/Fr23EiekbtZwmh1IcIG/l/a10GQ=="], @@ -688,6 +707,8 @@ "prr": ["prr@1.0.1", "", {}, "sha512-yPw4Sng1gWghHQWj0B3ZggWUm4qVbPwPFcRG8KyxiU7J2OHFSoEHKS+EZ3fv5l1t9CyCiop6l/ZYeWbrgoQejw=="], + "pump": ["pump@3.0.4", "", { "dependencies": { "end-of-stream": "^1.1.0", "once": "^1.3.1" } }, "sha512-VS7sjc6KR7e1ukRFhQSY5LM2uBWAUPiOPa/A3mkKmiMwSmRFUITt0xuj+/lesgnCv+dPIEYlkzrcyXgquIHMcA=="], + "punycode": ["punycode@2.3.1", "", {}, "sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg=="], "quick-format-unescaped": ["quick-format-unescaped@4.0.4", "", {}, "sha512-tYC1Q1hgyRuHgloV/YXs2w15unPVh8qfu/qCTfhTYamaw7fyhumKa2yGpdSo87vY32rIclj+4fWYQXUMs9EHvg=="], @@ -726,6 +747,8 @@ "schema-utils": ["schema-utils@4.3.3", "", { "dependencies": { "@types/json-schema": "^7.0.9", "ajv": "^8.9.0", "ajv-formats": "^2.1.1", "ajv-keywords": "^5.1.0" } }, "sha512-eflK8wEtyOE6+hsaRVPxvUKYCpRgzLqDTb8krvAsRIwOGlHoSgYLgBXoubGgLd2fT41/OUYdb48v4k4WWHQurA=="], + "secure-json-parse": ["secure-json-parse@4.1.0", "", {}, "sha512-l4KnYfEyqYJxDwlNVyRfO2E4NTHfMKAWdUuA8J0yve2Dz/E/PdBepY03RvyJpssIpRFwJoCD55wA+mEDs6ByWA=="], + "semver": ["semver@7.8.2", "", { "bin": { "semver": "bin/semver.js" } }, "sha512-c8jsqUZm3omBOI66G90z1Dyw5z622G8oLG+omfsHBJf3CWQTlOcwOjvOG6wtiNfW6anKm/eA39LMwMtMez2TiQ=="], "set-function-length": ["set-function-length@1.2.2", "", { "dependencies": { "define-data-property": "^1.1.4", "es-errors": "^1.3.0", "function-bind": "^1.1.2", "get-intrinsic": "^1.2.4", "gopd": "^1.0.1", "has-property-descriptors": "^1.0.2" } }, "sha512-pgRc4hJ4/sNjWCSS9AmnS40x3bNMDTknHgL5UaMBTMyJnU90EgWh1Rz+MC9eFu4BuN/UwZjKQuY/1v3rM7HMfg=="], @@ -774,6 +797,8 @@ "strip-ansi-cjs": ["strip-ansi@6.0.1", "", { "dependencies": { "ansi-regex": "^5.0.1" } }, "sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A=="], + "strip-json-comments": ["strip-json-comments@5.0.3", "", {}, "sha512-1tB5mhVo7U+ETBKNf92xT4hrQa3pm0MZ0PQvuDnWgAAGHDsfp4lPSpiS6psrSiet87wyGPh9ft6wmhOMQ0hDiw=="], + "supports-color": ["supports-color@7.2.0", "", { "dependencies": { "has-flag": "^4.0.0" } }, "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw=="], "supports-preserve-symlinks-flag": ["supports-preserve-symlinks-flag@1.0.0", "", {}, "sha512-ot0WnXS9fgdkgIcePe6RHNk1WA8+muPa6cSjeR3V8K27q9BB1rTE3R1p7Hv0z1ZyAc8s6Vvv8DIyWf681MAt0w=="], @@ -854,6 +879,8 @@ "wrap-ansi-cjs": ["wrap-ansi@7.0.0", "", { "dependencies": { "ansi-styles": "^4.0.0", "string-width": "^4.1.0", "strip-ansi": "^6.0.0" } }, "sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q=="], + "wrappy": ["wrappy@1.0.2", "", {}, "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ=="], + "ws": ["ws@7.5.11", "", { "peerDependencies": { "bufferutil": "^4.0.1", "utf-8-validate": "^5.0.2" }, "optionalPeers": ["bufferutil", "utf-8-validate"] }, "sha512-zS54Oen9bITtp7kp2XM3AydrCIq1D+HwJOuH+c+e4LfpL/lotP5osijd+UoMnxwAam1GN8R4KtLAyIrIcBNpiA=="], "xtend": ["xtend@4.0.2", "", {}, "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ=="], diff --git a/data/db/entities/kline.entity.ts b/data/db/entities/kline.entity.ts index a3912c5..3f85a4b 100644 --- a/data/db/entities/kline.entity.ts +++ b/data/db/entities/kline.entity.ts @@ -20,21 +20,11 @@ import { Entity, PrimaryColumn, Column, - Index, CreateDateColumn, UpdateDateColumn, } from "typeorm"; -/** K 线周期枚举 */ -export type KlineInterval = - | "1m" - | "5m" - | "15m" - | "30m" - | "1h" - | "4h" - | "1d" - | "1w"; +import type { KlineInterval } from '../../types'; /** * 1 分钟 K 线 Hypertable @@ -56,26 +46,34 @@ export type KlineInterval = }, }, }) -@Index(["exchange", "symbol", "interval", "time"], { unique: true }) @Entity("klines") export class Kline { + /** + * 复合主键:交易所 + 交易对 + 周期 + 时间 + * + * 设计原因: + * - 不同 symbol 在同一时刻有各自的 K 线,单列 time PK 会导致跨 symbol 冲突 + * - 四列复合主键 = 业务唯一性语义,同时满足 TimescaleDB hypertable 要求 + * (分区列 time 必须包含在主键中) + * - 不再需要额外的 @Index unique — 复合主键已保证唯一性 + */ + /** 交易所标识(binance / okx / bybit) */ + @PrimaryColumn("text") + exchange!: string; + + /** 交易对符号(如 BTCUSDT) */ + @PrimaryColumn("text") + symbol!: string; + + /** K 线周期(1m) */ + @PrimaryColumn("text") + interval!: KlineInterval; + /** K 线开盘时间(UTC)— @timescaledb/typeorm 自动标记为时间分区列 */ @TimeColumn() @PrimaryColumn("timestamptz") time!: Date; - /** 交易所标识(binance / okx / bybit) */ - @Column("text") - exchange!: string; - - /** 交易对符号(如 BTCUSDT) */ - @Column("text") - symbol!: string; - - /** K 线周期(1m) */ - @Column("text") - interval!: KlineInterval; - // ============================================================ // OHLCV 价格数据(NUMERIC(20,8) 精度,与交易所对齐) // ============================================================ diff --git a/data/db/entities/trading-pair.entity.ts b/data/db/entities/trading-pair.entity.ts index ca4d86b..20f5e16 100644 --- a/data/db/entities/trading-pair.entity.ts +++ b/data/db/entities/trading-pair.entity.ts @@ -22,6 +22,8 @@ import { import { Exchange } from "./exchange.entity"; import { CommonBaseEntity } from "./common.entity"; +import type { KlineInterval } from '../../types'; + @Entity("trading_pairs") @Index(["exchange", "symbol"], { unique: true }) // 同一交易所下 symbol 唯一 @Index(["active"]) // 按激活状态快速筛选 @@ -71,6 +73,10 @@ export class TradingPair extends CommonBaseEntity { @Column("boolean", { default: true }) kline_synthesis_enabled!: boolean; + /** K 线时间周期 */ + @Column("varchar", { length: 100, default: "1m" }) + kline_interval!: KlineInterval; + /** K 线合成周期列表(逗号分隔,如 "1m,5m,15m,1h,4h,1d") */ @Column("varchar", { length: 100, default: "1m,5m,15m,1h,4h,1d" }) kline_intervals!: string; diff --git a/data/example/pair.ts b/data/example/pair.ts new file mode 100644 index 0000000..1606f94 --- /dev/null +++ b/data/example/pair.ts @@ -0,0 +1,77 @@ +import { logger } from "../utils/logger"; +import { AppDataSource } from "../db/data-source"; +import { Exchange } from "../db/entities/exchange.entity"; +import { TradingPair } from "../db/entities/trading-pair.entity"; + +interface PairSeed { + symbol: string; + baseAsset: string; + quoteAsset: string; +} + +const PAIRS: PairSeed[] = [ + { symbol: "BTCUSDT", baseAsset: "BTC", quoteAsset: "USDT" }, + { symbol: "ETHUSDT", baseAsset: "ETH", quoteAsset: "USDT" }, +]; + +async function run(): Promise { + logger.info("Seeding trading pairs..."); + + const exchangeRepo = AppDataSource.getRepository(Exchange); + const pairRepo = AppDataSource.getRepository(TradingPair); + + // 1. 确保 binance 交易所存在 + let exchange = await exchangeRepo.findOne({ where: { name: "binance" } }); + if (!exchange) { + exchange = exchangeRepo.create({ + name: "binance", + label: "Binance", + enabled: true, + }); + await exchangeRepo.save(exchange); + logger.info("Created exchange: binance"); + } else { + logger.info("Exchange already exists: binance"); + } + + // 2. 逐个插入交易对(跳过已存在的) + let created = 0; + let skipped = 0; + + for (const seed of PAIRS) { + const existing = await pairRepo.findOne({ + where: { + exchange: { id: exchange.id }, + symbol: seed.symbol, + }, + }); + + if (existing) { + logger.info({ symbol: seed.symbol }, "Trading pair already exists, skipping"); + skipped++; + continue; + } + + const pair = pairRepo.create({ + exchange, + symbol: seed.symbol, + base_asset: seed.baseAsset, + quote_asset: seed.quoteAsset, + active: true, + kline_synthesis_enabled: true, + }); + + await pairRepo.save(pair); + created++; + logger.info({ symbol: seed.symbol, id: pair.id }, "Created trading pair"); + } + + logger.info({ created, skipped, total: PAIRS.length }, "Seeding complete"); + await AppDataSource.destroy(); + process.exit(0); +} + +run().catch((err) => { + logger.error({ err }, "Seeding failed"); + process.exit(1); +}); diff --git a/data/exchanges/base.ts b/data/exchanges/base.ts deleted file mode 100644 index ddf1c19..0000000 --- a/data/exchanges/base.ts +++ /dev/null @@ -1,186 +0,0 @@ -// ============================================================ -// base.ts — 交易所适配器抽象基类 -// ============================================================ -// 所有交易所适配器(Binance / OKX / Bybit ...)继承此类, -// 复用指数退避重连、连接状态管理、限流等通用逻辑。 -// -// 子类只需实现: -// - connect() — 建立 WebSocket/REST 连接 -// - disconnect() — 断开连接并清理资源 -// - subscribeTicker() / subscribeTrade() / subscribeOrderbook() -// - fetchKlines() — REST 历史 K 线补拉 -// - fetchMarkets() — 交易对元数据拉取 -// ============================================================ - -import { Subject, type Observable } from "rxjs"; -import { logger } from "../utils/logger"; -import type { - MarketDataFeed, - Ticker, - Trade, - OrderBook, - Kline, - KlineInterval, - MarketInfo, - ConnectionState, - AdapterConfig, -} from "./types"; -import { DEFAULT_ADAPTER_CONFIG } from "./types"; - -// ============================================================ -// 工具:异步 sleep -// ============================================================ - -/** 返回一个在 ms 毫秒后 resolve 的 Promise */ -function sleep(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); -} - -// ============================================================ -// BaseExchangeAdapter -// ============================================================ - -export abstract class BaseExchangeAdapter implements MarketDataFeed { - /** 交易所标识(子类必须覆盖) */ - abstract readonly exchange: string; - - /** 适配器配置(可在子类构造函数中覆盖默认值) */ - protected readonly config: AdapterConfig; - - /** 当前连接状态 */ - protected _connectionState: ConnectionState = "disconnected"; - - /** 当前重连尝试次数(成功连接后重置) */ - protected reconnectAttempt = 0; - - /** Subject 清理注册表 —— disconnect 时统一 complete */ - protected activeSubjects = new Set>(); - - // ============================================================ - // 构造函数 - // ============================================================ - - constructor(config: Partial = {}) { - this.config = { ...DEFAULT_ADAPTER_CONFIG, ...config }; - } - - // ============================================================ - // 连接状态(只读暴露) - // ============================================================ - - get connectionState(): ConnectionState { - return this._connectionState; - } - - /** 更新连接状态并记录日志 */ - protected setConnectionState(state: ConnectionState): void { - const prev = this._connectionState; - this._connectionState = state; - if (prev !== state) { - logger.info( - { exchange: this.exchange, from: prev, to: state }, - `[${this.exchange}] connection state: ${prev} → ${state}`, - ); - } - } - - // ============================================================ - // 指数退避重连(所有子类复用) - // ============================================================ - - /** - * 执行指数退避重连。 - * - * 延迟公式:delay = baseDelay × 2^min(attempt, 5) - * - attempt=0: 3s - * - attempt=1: 6s - * - attempt=2: 12s - * - attempt=5: 96s(之后不再翻倍) - * - * 超过 maxReconnectAttempts 后抛出错误。 - * - * @throws 达到最大重试次数后抛出 - */ - protected async reconnect(): Promise { - const { reconnectBaseDelayMs: baseDelay, maxReconnectAttempts } = this.config; - - if (this.reconnectAttempt >= maxReconnectAttempts) { - this.setConnectionState("error"); - throw new Error( - `[${this.exchange}] 重连失败:已达最大重试次数 (${maxReconnectAttempts})`, - ); - } - - const cappedAttempt = Math.min(this.reconnectAttempt, 5); - const delay = baseDelay * Math.pow(2, cappedAttempt); - - logger.warn( - { - exchange: this.exchange, - attempt: this.reconnectAttempt + 1, - maxAttempts: maxReconnectAttempts, - delayMs: delay, - }, - `[${this.exchange}] WebSocket 重连中...`, - ); - - await sleep(delay); - - this.reconnectAttempt++; - this.setConnectionState("connecting"); - await this.connect(); - } - - /** 成功连接后重置重连计数器 */ - protected resetReconnectAttempts(): void { - if (this.reconnectAttempt > 0) { - logger.info( - { exchange: this.exchange, attempts: this.reconnectAttempt }, - `[${this.exchange}] 重连成功,计数器重置`, - ); - } - this.reconnectAttempt = 0; - } - - // ============================================================ - // Subject 管理工具 - // ============================================================ - - /** - * 创建一个受管理的 Subject,disconnect 时自动 complete。 - * 子类在 subscribe* 方法中使用此工具创建 Subject。 - */ - protected createManagedSubject(): Subject { - const subject = new Subject(); - this.activeSubjects.add(subject as Subject); - return subject; - } - - /** 完成所有受管理的 Subject(disconnect 时调用) */ - protected completeAllSubjects(): void { - for (const subject of this.activeSubjects) { - subject.complete(); - } - this.activeSubjects.clear(); - } - - // ============================================================ - // 抽象方法 —— 子类必须实现 - // ============================================================ - - abstract connect(): Promise; - abstract disconnect(): Promise; - abstract subscribeTicker(symbols: string[]): Observable; - abstract subscribeTrade(symbols: string[]): Observable; - abstract subscribeOrderbook(symbol: string, depth?: number): Observable; - abstract fetchKlines( - symbol: string, - interval: KlineInterval, - startTime: number, - endTime: number, - limit?: number, - ): Promise; - abstract fetchMarkets(): Promise; -} - -export default BaseExchangeAdapter; diff --git a/data/exchanges/base_rest.ts b/data/exchanges/base_rest.ts new file mode 100644 index 0000000..02ae80f --- /dev/null +++ b/data/exchanges/base_rest.ts @@ -0,0 +1,109 @@ +// ============================================================ +// base.ts — REST 客户端抽象基类 +// ============================================================ +// 各交易所适配器继承此类,复用 REST 请求限流、重试等通用逻辑。 +// 子类通过注入各交易所原生 SDK(binance / okx / bybit ...) +// 实现具体的数据拉取,无需依赖 ccxt。 +// +// 子类只需实现: +// - fetchKlines() — 历史 K 线拉取(基于目标交易所 SDK) +// - fetchMarkets() — 交易对元数据拉取(用于自动注册交易对) +// +// WebSocket 实时行情由各适配器自行管理,不在此基类范围内。 +// ============================================================ + +import { logger } from "../utils/logger"; +import type { Kline, KlineInterval, MarketInfo, RestClientConfig } from "../types"; +import { DEFAULT_REST_CONFIG } from "../types/base"; + +// ============================================================ +// 工具:异步 sleep +// ============================================================ + +/** 返回一个在 ms 毫秒后 resolve 的 Promise */ +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +// ============================================================ +// BaseRestClient +// ============================================================ + +export abstract class BaseRestClient { + /** 交易所标识(子类必须覆盖) */ + abstract readonly exchange: string; + + /** REST 客户端配置(可在子类构造函数中覆盖默认值) */ + protected readonly config: RestClientConfig; + + /** REST 请求节流 Map(key → 上次请求时间戳) */ + private lastRestFetch = new Map(); + + // ============================================================ + // 构造函数 + // ============================================================ + + constructor(config: Partial = {}) { + this.config = { ...DEFAULT_REST_CONFIG, ...config }; + } + + // ============================================================ + // 限流工具 + // ============================================================ + + /** + * 对同一 key 的 REST 请求进行冷却节流。 + * 若距离上次请求不足 restRateLimitMs,自动等待剩余时间。 + * + * @param key - 节流标识(如 `${symbol}:${interval}`) + */ + protected async throttle(key: string): Promise { + const lastFetch = this.lastRestFetch.get(key) ?? 0; + const elapsed = Date.now() - lastFetch; + if (elapsed < this.config.restRateLimitMs) { + const waitMs = this.config.restRateLimitMs - elapsed; + logger.debug( + { exchange: this.exchange, key, waitMs }, + `[${this.exchange}] REST 限流等待 ${waitMs}ms`, + ); + await sleep(waitMs); + } + this.lastRestFetch.set(key, Date.now()); + } + + // ============================================================ + // 抽象方法 —— 子类必须使用各自的交易所 SDK 实现 + // ============================================================ + + /** + * 拉取历史 K 线数据(REST)。 + * + * 子类负责: + * 1. 调用交易所原生 SDK 的 K 线接口 + * 2. 将原始数据转换为本系统标准化 Kline 结构 + * 3. 处理分页逻辑(若时间跨度超过单次请求上限) + * + * @param symbol - 交易对符号(如 BTCUSDT) + * @param interval - K 线周期 + * @param startTime - 起始时间(Unix ms) + * @param endTime - 结束时间(Unix ms) + * @param limit - 单次最大条数(默认取自 config.defaultLimit) + */ + abstract fetchKlines( + symbol: string, + interval: KlineInterval, + startTime: number, + endTime: number, + limit?: number, + ): Promise; + + /** + * 获取交易所交易对信息(REST)。 + * + * 子类负责调用交易所原生 SDK 的 /exchangeInfo 等价接口, + * 并转换为本系统标准化 MarketInfo 结构。 + */ + abstract fetchMarkets(): Promise; +} + +export default BaseRestClient; diff --git a/data/exchanges/binance.ts b/data/exchanges/binance.ts deleted file mode 100644 index 923629e..0000000 --- a/data/exchanges/binance.ts +++ /dev/null @@ -1,782 +0,0 @@ -// ============================================================ -// binance.ts — Binance 交易所适配器 -// ============================================================ -// 基于 Binance 官方 SDK(binance@3.x)实现 MarketDataFeed 接口。 -// -// WebSocket:使用 SDK 内置 WebsocketClient,自动处理多路复用、 -// 断线重连、心跳保活。通过 formattedMessage 事件接收已解析的 -// 类型化行情数据,转换为本系统标准化结构后通过 RxJS Subject 发布。 -// -// REST:使用 SDK 内置 MainClient(Spot),用于: -// - fetchKlines() 历史 K 线补拉 -// - fetchMarkets() 交易对元数据(用于自动注册到 trading_pairs 表) -// -// ============================================================ -// 风险提示: -// - Binance WebSocket 单连接最多订阅 1024 个 stream, -// 超出需拆分多连接(SDK 自动处理) -// - 生产环境建议使用 Binance 的 combined streams 合并请求 -// - REST API 限频:1200 请求/分钟(权重制),fetchKlines 权重 2 -// ============================================================ - -import { Subject, type Observable } from "rxjs"; -import { - WebsocketClient, - MainClient, - type WsMessageKlineFormatted, - type WsMessageTradeFormatted, - type WsMessage24hrTickerFormatted, - type WsMessageBookTickerEventFormatted, - type WsMessagePartialBookDepthEventFormatted, - type WsFormattedMessage, -} from "binance"; -import type { Kline as BinanceRestKline } from "binance"; -import { BaseExchangeAdapter } from "./base"; -import { logger } from "../utils/logger"; -import type { - Ticker, - Trade, - OrderBook, - Kline, - KlineInterval, - MarketInfo, - AdapterConfig, - BinanceRawKline, -} from "./types"; -import { KLINE_INTERVAL_MS } from "./types"; - -// ============================================================ -// Binance K 线周期 ← → 本系统 K 线周期映射 -// ============================================================ - -/** - * Binance SDK 支持的 K 线周期(比本系统更多)。 - * 本系统仅使用其中的子集,其余周期由 pipeline 合成。 - */ -type BinanceKlineInterval = - | "1m" - | "5m" - | "15m" - | "30m" - | "1h" - | "4h" - | "1d" - | "1w"; - -/** 本系统 KlineInterval → Binance SDK KlineInterval(1:1 子集映射) */ -const INTERVAL_TO_BINANCE: Record = { - "1m": "1m", - "5m": "5m", - "15m": "15m", - "30m": "30m", - "1h": "1h", - "4h": "4h", - "1d": "1d", - "1w": "1w", -}; - -// ============================================================ -// 默认适配器配置(Binance 专用覆盖) -// ============================================================ - -const DEFAULT_BINANCE_CONFIG: AdapterConfig = { - reconnectBaseDelayMs: 3000, - maxReconnectAttempts: 10, - /** Binance REST API 权重制限频,保守设为 250ms */ - restRateLimitMs: 250, -}; - -// ============================================================ -// BinanceAdapter -// ============================================================ - -export class BinanceAdapter extends BaseExchangeAdapter { - readonly exchange = "binance"; - - // ---------------------------------------------------------- - // SDK 客户端实例 - // ---------------------------------------------------------- - - /** Binance WebSocket 客户端(内置多路复用 + 自动重连) */ - private wsClient!: WebsocketClient; - - /** Binance REST 客户端(Spot) */ - private restClient!: MainClient; - - // ---------------------------------------------------------- - // RxJS Subject —— 按事件类型分频道发布 - // ---------------------------------------------------------- - - /** 24h Ticker 流(合并所有已订阅 symbol) */ - private tickerSubject!: Subject; - - /** 逐笔成交流(合并所有已订阅 symbol) */ - private tradeSubject!: Subject; - - /** 订单簿深度流(合并所有已订阅 symbol) */ - private orderbookSubjects = new Map>(); - - // ---------------------------------------------------------- - // 订阅追踪 - // ---------------------------------------------------------- - - /** 当前已订阅的 ticker symbol 集合 */ - private subscribedTickerSymbols = new Set(); - - /** 当前已订阅的 trade symbol 集合 */ - private subscribedTradeSymbols = new Set(); - - /** 当前已订阅的 orderbook symbol → depth 映射 */ - private subscribedOrderbookDepths = new Map(); - - /** 防止重复 REST 请求的节流 Map(symbol:interval → lastFetchTime) */ - private lastRestFetch = new Map(); - - // ============================================================ - // 构造函数 - // ============================================================ - - constructor(config: Partial = {}) { - super({ ...DEFAULT_BINANCE_CONFIG, ...config }); - } - - // ============================================================ - // 连接管理 - // ============================================================ - - /** - * 建立 WebSocket 连接并注册事件监听。 - * - * Binance SDK 的 WebsocketClient 在首次 subscribe() 时自动建连, - * 此处主动调用 connectPublic() 预热连接并注册 formattedMessage 监听。 - */ - async connect(): Promise { - if (this._connectionState === "connected") { - logger.debug(`[binance] 已连接,跳过重复 connect`); - return; - } - - this.setConnectionState("connecting"); - - try { - // 初始化 WebSocket 客户端 - this.wsClient = new WebsocketClient({ - // 生产环境使用 Binance 主网 - // useTestnet: false 为默认值 - }); - - // 初始化 REST 客户端(公开接口无需 API Key) - this.restClient = new MainClient(); - - // 注册 formattedMessage 事件 —— SDK 将原始 JSON 解析为类型化对象 - this.wsClient.on("formattedMessage", this.onFormattedMessage.bind(this)); - - // 注册重连事件(利用 SDK 内置的自动重连) - this.wsClient.on("reconnecting", (evt) => { - logger.warn( - { wsKey: evt.wsKey }, - `[binance] WebSocket 重连中...`, - ); - this.setConnectionState("connecting"); - }); - - this.wsClient.on("reconnected", (evt) => { - logger.info( - { wsKey: evt.wsKey, wsUrl: evt.wsUrl }, - `[binance] WebSocket 重连成功`, - ); - this.setConnectionState("connected"); - this.resetReconnectAttempts(); - }); - - this.wsClient.on("close", (evt) => { - logger.warn( - { wsKey: evt.wsKey }, - `[binance] WebSocket 连接关闭`, - ); - // 如果之前是已连接状态,SDK 会自动重连 - if (this._connectionState === "connected") { - this.setConnectionState("connecting"); - } - }); - - // 预热连接(SDK 连接到 spot 公开行情端点) - await this.wsClient.connectPublic(); - this.setConnectionState("connected"); - this.resetReconnectAttempts(); - - logger.info(`[binance] WebSocket 连接已建立`); - - } catch (err) { - this.setConnectionState("error"); - logger.error({ err }, `[binance] 连接失败`); - throw err; - } - } - - /** - * 断开连接并清理资源。 - * - * 1. 取消所有 WebSocket 订阅 - * 2. 关闭 WebSocket 客户端 - * 3. Complete 所有 RxJS Subject - */ - async disconnect(): Promise { - logger.info(`[binance] 断开连接...`); - - try { - // 取消所有订阅 - if (this.wsClient && this.subscribedTradeSymbols.size > 0) { - const topics = [...this.subscribedTradeSymbols].map( - (s) => `${s.toLowerCase()}@trade`, - ); - await this.wsClient.unsubscribe(topics, "main"); - } - if (this.wsClient && this.subscribedTickerSymbols.size > 0) { - const topics = [...this.subscribedTickerSymbols].map( - (s) => `${s.toLowerCase()}@ticker`, - ); - await this.wsClient.unsubscribe(topics, "main"); - } - if (this.wsClient && this.subscribedOrderbookDepths.size > 0) { - for (const [symbol, depth] of this.subscribedOrderbookDepths) { - const topic = `${symbol.toLowerCase()}@depth${depth}@100ms`; - await this.wsClient.unsubscribe([topic], "main"); - } - } - } catch (err) { - logger.warn({ err }, `[binance] 取消订阅时出错(忽略)`); - } - - // 关闭 WS 客户端所有连接(SDK 自动关闭底层 WebSocket) - try { - this.wsClient?.closeAll(); - } catch { - // 忽略关闭错误 - } - - // Complete 所有 Subject - this.tickerSubject?.complete(); - this.tradeSubject?.complete(); - for (const subject of this.orderbookSubjects.values()) { - subject.complete(); - } - this.orderbookSubjects.clear(); - - this.subscribedTickerSymbols.clear(); - this.subscribedTradeSymbols.clear(); - this.subscribedOrderbookDepths.clear(); - - this.setConnectionState("disconnected"); - logger.info(`[binance] 已断开连接`); - } - - // ============================================================ - // 订阅 Ticker(24h 滚动统计) - // ============================================================ - - subscribeTicker(symbols: string[]): Observable { - if (!this.tickerSubject) { - this.tickerSubject = this.createManagedSubject(); - } - - const newSymbols = symbols.filter( - (s) => !this.subscribedTickerSymbols.has(s), - ); - - if (newSymbols.length > 0 && this._connectionState === "connected") { - const topics = newSymbols.map( - (s) => `${s.toLowerCase()}@ticker`, - ); - this.wsClient.subscribe(topics, "main").catch((err) => { - logger.error({ err, symbols: newSymbols }, `[binance] 订阅 ticker 失败`); - }); - for (const s of newSymbols) { - this.subscribedTickerSymbols.add(s); - } - logger.info( - { count: newSymbols.length, symbols: newSymbols }, - `[binance] 订阅 ticker`, - ); - } - - return this.tickerSubject.asObservable(); - } - - // ============================================================ - // 订阅逐笔成交 - // ============================================================ - - subscribeTrade(symbols: string[]): Observable { - if (!this.tradeSubject) { - this.tradeSubject = this.createManagedSubject(); - } - - const newSymbols = symbols.filter( - (s) => !this.subscribedTradeSymbols.has(s), - ); - - if (newSymbols.length > 0 && this._connectionState === "connected") { - const topics = newSymbols.map( - (s) => `${s.toLowerCase()}@trade`, - ); - this.wsClient.subscribe(topics, "main").catch((err) => { - logger.error({ err, symbols: newSymbols }, `[binance] 订阅 trade 失败`); - }); - for (const s of newSymbols) { - this.subscribedTradeSymbols.add(s); - } - logger.info( - { count: newSymbols.length, symbols: newSymbols }, - `[binance] 订阅 trade`, - ); - } - - return this.tradeSubject.asObservable(); - } - - // ============================================================ - // 订阅订单簿深度 - // ============================================================ - - subscribeOrderbook(symbol: string, depth: number = 20): Observable { - const key = `${symbol}@${depth}`; - let subject = this.orderbookSubjects.get(key); - if (subject) { - return subject.asObservable(); - } - - subject = this.createManagedSubject(); - this.orderbookSubjects.set(key, subject); - - if (this._connectionState === "connected") { - const topic = `${symbol.toLowerCase()}@depth${depth}@100ms`; - this.wsClient.subscribe([topic], "main").catch((err) => { - logger.error({ err, symbol, depth }, `[binance] 订阅 orderbook 失败`); - }); - this.subscribedOrderbookDepths.set(symbol, depth); - logger.info({ symbol, depth }, `[binance] 订阅 orderbook`); - } - - return subject.asObservable(); - } - - // ============================================================ - // REST:拉取历史 K 线(补缺失数据 / 回测) - // ============================================================ - - /** - * 通过 Binance REST API 拉取历史 K 线。 - * - * Binance 限制: - * - 单次最多 1000 条(默认 500) - * - 权重 2(1200 权重/分钟 → 600 次请求/分钟) - * - 自动分页逻辑:如果时间跨度超过 limit 条,自动多次请求拼接 - * - * @param symbol - 交易对(如 BTCUSDT) - * @param interval - K 线周期 - * @param startTime - 起始时间(Unix ms) - * @param endTime - 结束时间(Unix ms) - * @param limit - 单次最大条数(默认 500,最大 1000) - */ - async fetchKlines( - symbol: string, - interval: KlineInterval, - startTime: number, - endTime: number, - limit: number = 500, - ): Promise { - const binanceInterval = INTERVAL_TO_BINANCE[interval]; - const intervalMs = KLINE_INTERVAL_MS[interval]; - const maxLimit = Math.min(limit, 1000); // Binance 硬限制 1000 - - const allKlines: Kline[] = []; - let currentStart = startTime; - - // 自动分页:如果时间跨度超过 maxLimit 条 K 线,分批拉取 - while (currentStart < endTime) { - // 速率限制(保守节流) - const throttleKey = `${symbol}:${interval}`; - const lastFetch = this.lastRestFetch.get(throttleKey) ?? 0; - const elapsed = Date.now() - lastFetch; - if (elapsed < this.config.restRateLimitMs) { - await new Promise((r) => - setTimeout(r, this.config.restRateLimitMs - elapsed), - ); - } - this.lastRestFetch.set(throttleKey, Date.now()); - - try { - const rawKlines = await this.restClient.getKlines({ - symbol, - interval: binanceInterval, - startTime: currentStart, - endTime, - limit: maxLimit, - }); - - if (!rawKlines || rawKlines.length === 0) { - break; // 无更多数据 - } - - // 转换 Binance REST K 线 → 本系统标准化 K 线 - const converted = rawKlines.map((k) => - this.convertRestKline(k, symbol, interval), - ); - allKlines.push(...converted); - - // Binance REST K 线格式:[openTime, open, high, low, close, volume, closeTime, ...] - // 最后一条的开盘时间 + interval 作为下一批的起点 - const lastKline = rawKlines[rawKlines.length - 1]!; - const lastOpenTime = (lastKline as number[])[0] as number; - currentStart = lastOpenTime + intervalMs; - - // 如果返回数量 < limit,说明已拉完 - if (rawKlines.length < maxLimit) { - break; - } - - } catch (err) { - logger.error( - { err, symbol, interval, currentStart, endTime }, - `[binance] fetchKlines 失败`, - ); - throw err; - } - } - - logger.debug( - { symbol, interval, count: allKlines.length, startTime, endTime }, - `[binance] fetchKlines 完成`, - ); - - return allKlines; - } - - // ============================================================ - // REST:拉取交易对元数据 - // ============================================================ - - /** - * 从 Binance 获取所有现货交易对信息,转换为本系统 MarketInfo 格式。 - * - * 用于自动注册到 trading_pairs 表,避免手动配置。 - */ - async fetchMarkets(): Promise { - logger.info(`[binance] 拉取交易对信息...`); - - try { - const exchangeInfo = await this.restClient.getExchangeInfo(); - - const markets: MarketInfo[] = []; - - for (const symbolInfo of exchangeInfo.symbols) { - // 仅保留状态为 TRADING 的现货交易对 - if (symbolInfo.status !== "TRADING") continue; - - const filters = symbolInfo.filters; - - // 从 filters 中提取交易规则 - let tickSize: string | undefined; - let stepSize: string | undefined; - let minQty: string | undefined; - let minNotional: string | undefined; - - for (const filter of filters) { - switch (filter.filterType) { - case "PRICE_FILTER": - tickSize = (filter as { tickSize: string }).tickSize; - break; - case "LOT_SIZE": - stepSize = (filter as { stepSize: string }).stepSize; - minQty = (filter as { minQty: string }).minQty; - break; - case "MIN_NOTIONAL": - case "NOTIONAL": - minNotional = (filter as { minNotional: string }).minNotional; - break; - } - } - - markets.push({ - symbol: symbolInfo.symbol, - baseAsset: symbolInfo.baseAsset, - quoteAsset: symbolInfo.quoteAsset, - pricePrecision: symbolInfo.quoteAssetPrecision, - quantityPrecision: symbolInfo.baseAssetPrecision, - minQty: minQty ? parseFloat(minQty) : undefined, - stepSize: stepSize ? parseFloat(stepSize) : undefined, - minNotional: minNotional ? parseFloat(minNotional) : undefined, - }); - } - - logger.info( - { count: markets.length }, - `[binance] 交易对信息拉取完成`, - ); - - return markets; - - } catch (err) { - logger.error({ err }, `[binance] fetchMarkets 失败`); - throw err; - } - } - - // ============================================================ - // 内部:formattedMessage 事件分发 - // ============================================================ - - /** - * Binance SDK formattedMessage 回调。 - * - * SDK 已将原始 WebSocket JSON 解析为类型化事件对象。 - * WsFormattedMessage 是复杂联合类型(含单事件 + 事件数组), - * TypeScript 的判别联合在此处不够精确。内部使用 `as any` 绕过 - * 联合类型限制,按 eventType 字符串运行时路由。 - */ - private onFormattedMessage(msg: WsFormattedMessage): void { - try { - // 数组类型(如 !ticker@arr → WsMessage24hrTickerFormatted[]) - if (Array.isArray(msg)) { - for (const item of msg) { - this.routeByEventType(item as unknown as Record); - } - return; - } - - this.routeByEventType(msg as unknown as Record); - } catch (err) { - const raw = msg as unknown as Record; - const eventType = String(raw["eventType"] ?? "unknown"); - logger.error( - { err, eventType }, - `[binance] 处理 formattedMessage 时出错`, - ); - } - } - - /** - * 按 eventType 运行时路由到对应 Subject。 - * - * 此处使用 unknown → Record 转换,因为 WsFormattedMessage - * 联合类型包含数组成员导致无法直接访问 eventType。 - */ - private routeByEventType(raw: Record): void { - const eventType = String(raw["eventType"] ?? ""); - if (!eventType) return; - - switch (eventType) { - case "24hrTicker": - case "!ticker@arr": - this.handleTickerMessage( - raw as unknown as WsMessage24hrTickerFormatted, - ); - break; - - case "trade": - this.handleTradeMessage( - raw as unknown as WsMessageTradeFormatted, - ); - break; - - case "bookTicker": - this.handleBookTickerMessage( - raw as unknown as WsMessageBookTickerEventFormatted, - ); - break; - - case "partialBookDepth": - this.handleOrderbookMessage( - raw as unknown as WsMessagePartialBookDepthEventFormatted, - ); - break; - - case "kline": - // K 线事件不在 adapter 层分发,由 pipeline 的 KlineSynthesizer 处理 - break; - - default: - // 忽略其他事件类型(用户数据流、账户更新等) - break; - } - } - - // ---------------------------------------------------------- - // 事件转换器:Binance → 本系统标准化类型 - // ---------------------------------------------------------- - - /** 24h Ticker → Ticker */ - private handleTickerMessage(msg: WsMessage24hrTickerFormatted): void { - if (!this.tickerSubject || this.tickerSubject.closed) return; - - const ticker: Ticker = { - exchange: "binance", - symbol: msg.symbol, - lastPrice: msg.currentClose, - openPrice: msg.open, - highPrice: msg.high, - lowPrice: msg.low, - volume: msg.baseAssetVolume, - quoteVolume: msg.quoteAssetVolume, - priceChange: msg.priceChange, - priceChangePercent: msg.priceChangePercent, - bidPrice: msg.bestBid, - bidQty: msg.bestBidQuantity, - askPrice: msg.bestAskPrice, - askQty: msg.bestAskQuantity, - eventTime: msg.eventTime, - closeTime: msg.closeTime, - }; - - this.tickerSubject.next(ticker); - } - - /** 逐笔成交 → Trade */ - private handleTradeMessage(msg: WsMessageTradeFormatted): void { - if (!this.tradeSubject || this.tradeSubject.closed) return; - - const trade: Trade = { - exchange: "binance", - symbol: msg.symbol, - price: msg.price, - amount: msg.quantity, - quoteAmount: msg.price * msg.quantity, - timestamp: msg.time, - isBuyerMaker: msg.maker, - tradeId: String(msg.tradeId), - }; - - this.tradeSubject.next(trade); - } - - /** BookTicker → Ticker(精简版,仅有最佳买卖价) */ - private handleBookTickerMessage(msg: WsMessageBookTickerEventFormatted): void { - // BookTicker 是 Ticker 的精简版,仅更新最佳买卖价 - // 如果有 tickerSubject,将其作为轻量 Ticker 推送 - if (!this.tickerSubject || this.tickerSubject.closed) return; - - const ticker: Ticker = { - exchange: "binance", - symbol: msg.symbol, - lastPrice: 0, // bookTicker 不含最新价 - openPrice: 0, - highPrice: 0, - lowPrice: 0, - volume: 0, - quoteVolume: 0, - priceChange: 0, - priceChangePercent: 0, - bidPrice: msg.bidPrice, - bidQty: msg.bidQty, - askPrice: msg.askPrice, - askQty: msg.askQty, - eventTime: msg.eventTime, - closeTime: 0, - }; - - this.tickerSubject.next(ticker); - } - - /** 订单簿深度快照 → OrderBook */ - private handleOrderbookMessage(msg: WsMessagePartialBookDepthEventFormatted): void { - // partialBookDepth 不含 symbol 字段(取决于 SDK 版本), - // 从 stream 名称或上下文推断 symbol。此处假设 SDK 已填充 symbol。 - const symbol = (msg as WsMessagePartialBookDepthEventFormatted & { symbol?: string }).symbol; - - if (!symbol) { - logger.warn(`[binance] 收到无 symbol 的 orderbook 消息,丢弃`); - return; - } - - // 查找匹配的 orderbook Subject(遍历所有已订阅 depth) - for (const [key, subject] of this.orderbookSubjects) { - const [subscribedSymbol] = key.split("@"); - if ( - subscribedSymbol?.toUpperCase() === symbol.toUpperCase() && - !subject.closed - ) { - const orderbook: OrderBook = { - exchange: "binance", - symbol: symbol.toUpperCase(), - bids: msg.bids.map( - ([price, qty]) => [parseFloat(String(price)), parseFloat(String(qty))] as [number, number], - ), - asks: msg.asks.map( - ([price, qty]) => [parseFloat(String(price)), parseFloat(String(qty))] as [number, number], - ), - lastUpdateId: msg.lastUpdateId, - eventTime: Date.now(), // partialBookDepth 不含 eventTime - }; - - subject.next(orderbook); - return; - } - } - } - - // ============================================================ - // 内部:REST K 线格式转换 - // ============================================================ - - /** - * 将 Binance REST K 线数组(元组)转换为本系统 Kline 对象。 - * - * Binance REST K 线格式: - * [ - * 0: openTime (ms), - * 1: open (string), - * 2: high (string), - * 3: low (string), - * 4: close (string), - * 5: volume (string), - * 6: closeTime (ms), - * 7: quoteVolume (string), - * 8: tradeCount (number), - * 9: takerBuyBaseVol (string), - * 10: takerBuyQuoteVol (string), - * 11: ignore (string) - * ] - */ - private convertRestKline( - raw: BinanceRestKline, - symbol: string, - interval: KlineInterval, - ): Kline { - // BinanceRestKline 是元组类型,按位置索引 - const arr = raw as unknown as [ - number, // 0: openTime - string, // 1: open - string, // 2: high - string, // 3: low - string, // 4: close - string, // 5: volume - number, // 6: closeTime - string, // 7: quoteVolume - number, // 8: tradeCount - string, // 9: takerBuyBaseVol - string, // 10: takerBuyQuoteVol - string, // 11: ignore - ]; - - return { - exchange: "binance", - symbol, - interval, - openTime: arr[0], - closeTime: arr[6], - open: parseFloat(arr[1]), - high: parseFloat(arr[2]), - low: parseFloat(arr[3]), - close: parseFloat(arr[4]), - volume: parseFloat(arr[5]), - quoteVolume: parseFloat(arr[7]), - takerBuyBaseVol: parseFloat(arr[9]), - takerBuyQuoteVol: parseFloat(arr[10]), - tradeCount: arr[8], - isClosed: true, // REST 返回的 K 线都是已闭合的 - }; - } -} - -export default BinanceAdapter; diff --git a/data/exchanges/rest.ts b/data/exchanges/rest.ts new file mode 100644 index 0000000..1a52d09 --- /dev/null +++ b/data/exchanges/rest.ts @@ -0,0 +1,253 @@ +import { MainClient, type Kline as BinanceRestKline } from "binance"; + +import { logger } from "../utils/logger"; +import { BaseRestClient } from './base_rest'; +import type { KlineInterval, Kline, MarketInfo } from '../types'; + +/** K 线周期 → 毫秒数映射(用于时间桶计算) */ +export const KLINE_INTERVAL_MS: Record = { + "1m": 60_000, + "5m": 300_000, + "15m": 900_000, + "30m": 1_800_000, + "1h": 3_600_000, + "4h": 14_400_000, + "1d": 86_400_000, + "1w": 604_800_000, +}; + +// ============================================================ +// Binance REST K 线 → 本系统标准化 Kline 转换 +// ============================================================ + +/** + * Binance SDK Kline 元组格式(getKlines / getUIKlines 返回): + * [0] openTime: number — 开盘时间(Unix ms) + * [1] open: numberInString — 开盘价 + * [2] high: numberInString — 最高价 + * [3] low: numberInString — 最低价 + * [4] close: numberInString — 收盘价 + * [5] volume: numberInString — 成交量(base 币种) + * [6] closeTime: number — 收盘时间(Unix ms) + * [7] quoteVolume: numberInString — 成交额(quote 币种) + * [8] tradeCount: number — 成交笔数 + * [9] takerBuyBaseVol: numberInString — 主动买入成交量 + * [10] takerBuyQuoteVol: numberInString — 主动买入成交额 + * [11] ignore: numberInString — 忽略字段 + * + * numberInString = string | number,通过 Number() 统一转换。 + * + * 参考:node_modules/binance/lib/types/shared.d.ts:85-98 + */ +function convertBinanceKline( + raw: BinanceRestKline, + symbol: string, + interval: KlineInterval, +): Kline { + const [ + openTime, + open, + high, + low, + close, + volume, + closeTime, + quoteVolume, + tradeCount, + takerBuyBaseVol, + takerBuyQuoteVol, + // [11] ignore — 丢弃 + ] = raw; + + return { + exchange: "binance", + symbol, + interval, + openTime: openTime, + closeTime: closeTime, + open: String(open), + high: String(high), + low: String(low), + close: String(close), + volume: String(volume), + quoteVolume: String(quoteVolume), + takerBuyBaseVol: String(takerBuyBaseVol), + takerBuyQuoteVol: String(takerBuyQuoteVol), + tradeCount: String(tradeCount), + isClosed: true, // REST 返回的 K 线均为已闭合历史数据 + }; +} + +// ============================================================ +// Binance REST 拉取函数 +// ============================================================ + +/** + * 通过 Binance 原生 SDK 拉取 UI K 线并转换为本系统 Kline。 + * + * getUIKlines 与 getKlines 返回同构的 Kline[] 元组, + * getUIKlines 额外支持 timeZone 参数,适合按交易所时区对齐。 + * + * @param symbol - 交易对(如 BTCUSDT) + * @param interval - K 线周期 + * @param startTime - 起始时间(Unix ms) + * @param endTime - 结束时间(Unix ms),可选 + * @param limit - 单次拉取条数,默认 500(最大 1000) + */ +async function fetchBinanceKlines( + symbol: string, + interval: KlineInterval, + startTime: number, + endTime?: number, + limit = 500, +): Promise { + const client = new MainClient({ + api_key: 'ONSJKIGRpDYLn6FdV17aAKfjclZ4I2LzamflhuMpsoRQA427lLKeyJlGtg2RZ7DH', + api_secret: '5Mfv4TgvDlRzCHbtl2nJL4mVHUvMm8pyjKiRjMoosBMxrhlqMw6CuQbg2qbS2Npd', + }); + + // Binance 硬限制:单次最多 1000 条 + const safeLimit = Math.min(limit, 1000); + + const rawKlines = await client.getKlines({ + symbol, + interval, + startTime, + endTime, + limit: safeLimit, + }); + + logger.info({ + symbol, + interval, + startTime, + endTime, + limit: safeLimit, + }, 'fetchBinanceKlines arguments'); + + if (!rawKlines || rawKlines.length === 0) { + return []; + } + + return filterConsecutive( + rawKlines.map((k, index) => { + // if (index === rawKlines.length - 1) { + // console.log(k); + // } + return convertBinanceKline(k, symbol, interval); + }), + interval, + ); +} + +/** + * 过滤出严格连续(无时间缺口)的 K 线序列。 + * + * 处理流程: + * 1. 按 openTime 升序排序(防御性,确保时间单调递增) + * 2. 从首条 K 线开始遍历,仅保留相邻间隔恰好等于 intervalMs 的条目 + * 3. 一旦检测到缺口(间隔 ≠ intervalMs),立即终止并丢弃后续所有数据 + * + * 设计意图: + * - 时间序列分析(回测、指标计算)依赖连续数据,缺口会引入偏误 + * - 缺口之后的数据可能来自另一段不连续的拉取结果,混入后风险更高 + * - "截断"策略优于"填充/跳过",避免伪造数据或隐藏数据质量问题 + * + * @param klines - 待过滤的 K 线数组(可能乱序、可能含缺口) + * @param interval - K 线周期,用于查表获取 intervalMs + * @returns 从首条开始严格连续的最大前缀子序列;空数组无缺口时返回完整排序结果 + */ +function filterConsecutive(klines: Kline[], interval: KlineInterval) { + // 查表获取当前 K 线周期对应的毫秒数 + const intervalMs = KLINE_INTERVAL_MS[interval]; + + // 防御性排序:Binance API 不保证返回顺序,升序排列确保时间单调 + const results = klines.sort((a: Kline, b: Kline) => { + return a.openTime - b.openTime; + }); + + return results; + + // console.log(results); + + // let _openTime = 0; // 哨兵:0 表示尚未初始化,非 0 表示上一条已收录 K 线的 openTime + // const rets: Kline[] = []; // 累积连续 K 线结果 + + // for (let item of results) { + + // console.log(item.openTime); + // // 分支 1 —— 首条 K 线:无条件收录,并初始化哨兵 + // if (_openTime === 0) { + // _openTime = item.openTime; + // rets.push(item); + // continue; + // } + + // // 分支 2 —— 严格连续:当前 openTime 与上一条恰好相差一个周期 + // if (item.openTime - _openTime === intervalMs) { + // _openTime = item.openTime; + // rets.push(item); + // continue; + // } + + // // 分支 3 —— 检测到缺口:截断,丢弃当前及之后所有 K 线 + // break; + // } + + // return rets; +} + +// ============================================================ +// Client —— 多交易所 REST 客户端 +// ============================================================ + +export class Client extends BaseRestClient { + exchange: string; + + /** + * @param exchange - 交易所 ID(如 "binance"、"okx"、"bybit") + * 内部根据 ID 分发到对应的 SDK 实现 + */ + constructor(exchange: string) { + super(); + this.exchange = exchange; + } + + /** + * 拉取历史 K 线数据,返回标准化 Kline 数组。 + * + * 根据交易所 ID 分发到各自的 SDK 拉取函数。 + * + * @param symbol - 交易对符号(如 BTCUSDT) + * @param interval - K 线周期 + * @param startTime - 起始时间(Unix ms) + * @param endTime - 结束时间(Unix ms),可选 + * @param limit - 最大返回条数,默认取自 config.defaultLimit + */ + async fetchKlines( + symbol: string, + interval: KlineInterval, + startTime: number, + limit?: number, + endTime?: number, + ): Promise { + const effectiveLimit = limit ?? this.config.defaultLimit; + switch (this.exchange) { + case "binance": + return fetchBinanceKlines(symbol, interval, startTime, endTime, effectiveLimit); + // TODO: 新增交易所在此添加 case + // case "okx": + // return fetchOkxKlines(symbol, interval, startTime, endTime, effectiveLimit); + default: + throw new Error( + `[Client] 不支持的交易所: "${this.exchange}",` + + `当前仅支持: binance`, + ); + } + } + + async fetchMarkets(): Promise { + // TODO: 各交易所实现 + return []; + } +} diff --git a/data/package.json b/data/package.json index 5542981..b0f77ee 100644 --- a/data/package.json +++ b/data/package.json @@ -4,13 +4,14 @@ "description": "数字货币量化交易系统 - TypeScript 数据模块", "type": "module", "scripts": { - "dev": "tsx watch src/index.ts", + "dev": "bun run run/main.ts", + "dev:watch": "bun --watch run/main.ts", "build": "tsc", - "start": "node dist/index.js", + "start": "bun run run/main.ts", "test": "vitest run", "test:watch": "vitest", - "lint": "eslint src/", - "format": "prettier --write src/" + "lint": "eslint .", + "format": "prettier --write ." }, "dependencies": { "@timescaledb/typeorm": "^0.0.1", @@ -19,6 +20,7 @@ "ioredis": "^5.11.1", "pg": "^8.21.0", "pino": "^10.3.1", + "pino-pretty": "^13.1.3", "rxjs": "^7.8.2", "typeorm": "^1.0.0", "yaml": "^2.9.0" diff --git a/data/run/exchange.ts b/data/run/exchange.ts new file mode 100644 index 0000000..11abe64 --- /dev/null +++ b/data/run/exchange.ts @@ -0,0 +1,44 @@ +import { logger } from "../utils/logger"; +import { getAllPairs, updatePairLastBackfillTime } from '../service/pair'; +import { upsertOrUpdateKlines } from "../service/kline"; +import { Client } from '../exchanges/rest'; + +function getNowMinuteMS() { + const minuteMS = 1000 * 60; + return Math.floor(Date.now() / minuteMS) * minuteMS +} + +const allPairs = await getAllPairs(); + +for (const pair of allPairs) { + const client = new Client("binance"); + let lastBackfillTime = pair.last_backfill_time.getTime(); + try { + while (lastBackfillTime < getNowMinuteMS()) { + console.log('lastBackfillTime', lastBackfillTime); + const klines = await client.fetchKlines( + pair.symbol, + pair.kline_interval, + lastBackfillTime, + 500 + ); + console.log(`拉取到 ${klines.length} 条 K 线`); + if (klines.length > 0) { + await upsertOrUpdateKlines(klines); + const lastK = klines[klines.length - 1]; + if (lastK) { + await updatePairLastBackfillTime(lastK?.symbol, new Date(lastK.openTime)); + if (lastBackfillTime === lastK.openTime) { + break; + } + lastBackfillTime = lastK.openTime; + } + } + await new Promise((resolve) => { + setTimeout(resolve, Math.random() * 1000); + }); + } + } catch (err) { + console.error("拉取失败:", err); + } +} \ No newline at end of file diff --git a/data/service/kline.ts b/data/service/kline.ts new file mode 100644 index 0000000..640f906 --- /dev/null +++ b/data/service/kline.ts @@ -0,0 +1,73 @@ +import { AppDataSource } from "../db/data-source"; +import { Kline } from "../db/entities/kline.entity"; +import type { Kline as KlineItem } from "../types"; +import { logger } from "../utils/logger"; + +const repo = AppDataSource.getRepository(Kline); + +/** + * 批量 UPSERT K 线数据到 TimescaleDB。 + * + * 映射应用层 KlineItem → 数据库实体,通过 INSERT ... ON CONFLICT DO UPDATE + * 实现幂等写入。冲突列为 [exchange, symbol, interval, time](四列复合主键), + * 冲突时更新 OHLCV 及扩展字段。 + * + * 适用场景: + * - 回补历史 K 线(幂等,重复拉取不产生重复行) + * - WebSocket 实时 K 线增量刷新(更新最新一根未闭合 K 线的 high/low/close/volume) + * + * 注意:依赖 Kline 实体的四列复合主键 [exchange, symbol, interval, time]。 + * 若实体 PK 结构变更,需同步更新 conflictPaths。 + * + * @param KlineItems - 应用层标准化 K 线数组 + */ +export async function upsertOrUpdateKlines(KlineItems: KlineItem[]) { + if (KlineItems.length === 0) { + return; + } + + logger.debug({ count: KlineItems.length }, "开始批量 UPSERT K 线"); + + // 应用层 KlineItem → 数据库实体 Kline + // 注意类型转换:应用层价格为 string(兼容交易所 SDK),DB 层为 NUMERIC(number) + const entities = KlineItems.map((item) => { + const entity = new Kline(); + entity.time = new Date(item.openTime); // Unix ms → Date + entity.exchange = item.exchange; + entity.symbol = item.symbol; + entity.interval = item.interval; + entity.open = Number(item.open); + entity.high = Number(item.high); + entity.low = Number(item.low); + entity.close = Number(item.close); + entity.volume = Number(item.volume); + entity.quote_volume = item.quoteVolume ? Number(item.quoteVolume) : undefined; + entity.taker_buy_base_vol = item.takerBuyBaseVol + ? Number(item.takerBuyBaseVol) + : undefined; + entity.taker_buy_quote_vol = item.takerBuyQuoteVol + ? Number(item.takerBuyQuoteVol) + : undefined; + entity.trade_count = item.tradeCount ? Number(item.tradeCount) : undefined; + entity.is_closed = item.isClosed; + return entity; + }); + + try { + // UPSERT: 冲突列匹配复合主键 [exchange, symbol, interval, time] + // 实体已改为四列复合 PK,ON CONFLICT 直接命中主键约束 + // skipUpdateIfNoValuesChanged: 减少不必要的写操作 + const result = await repo.upsert(entities, { + conflictPaths: ["exchange", "symbol", "interval", "time"], + skipUpdateIfNoValuesChanged: true, + }); + + logger.info( + { count: KlineItems.length, generatedMaps: result.generatedMaps.length }, + "K 线 UPSERT 完成", + ); + } catch (err) { + logger.error({ err, count: KlineItems.length }, "K 线 UPSERT 失败"); + throw err; + } +} \ No newline at end of file diff --git a/data/service/pair.ts b/data/service/pair.ts new file mode 100644 index 0000000..9287e22 --- /dev/null +++ b/data/service/pair.ts @@ -0,0 +1,27 @@ +import { AppDataSource } from "../db/data-source"; +import { TradingPair } from "../db/entities/trading-pair.entity"; + +const repo = AppDataSource.getRepository(TradingPair); + +export async function getAllPairs() { + const pairs = await repo.find({}); + return pairs; +} + +export async function getPairLastBackfillTime(symbol: string) { + const pair = await repo.findOneBy({ + symbol + }); + return pair?.last_backfill_time; +} + +export async function updatePairLastBackfillTime(symbol: string, time: Date) { + const pair = await repo.findOneBy({ + symbol + }); + if (pair === null) { + return; + } + pair.last_backfill_time = time; + return pair.save(); +} diff --git a/data/exchanges/types.ts b/data/types/base.ts similarity index 90% rename from data/exchanges/types.ts rename to data/types/base.ts index 88d9173..1c341a1 100644 --- a/data/exchanges/types.ts +++ b/data/types/base.ts @@ -13,32 +13,7 @@ import type { Observable } from "rxjs"; -// ============================================================ -// K 线周期 -// ============================================================ - -/** K 线周期枚举(与 kline.entity.ts 中 KlineInterval 保持一致) */ -export type KlineInterval = - | "1m" - | "5m" - | "15m" - | "30m" - | "1h" - | "4h" - | "1d" - | "1w"; - -/** K 线周期 → 毫秒数映射(用于时间桶计算) */ -export const KLINE_INTERVAL_MS: Record = { - "1m": 60_000, - "5m": 300_000, - "15m": 900_000, - "30m": 1_800_000, - "1h": 3_600_000, - "4h": 14_400_000, - "1d": 86_400_000, - "1w": 604_800_000, -}; +import type { KlineInterval } from "./kline"; // ============================================================ // 标准化行情数据结构 @@ -129,23 +104,23 @@ export interface Kline { /** 收盘时间(Unix ms) */ closeTime: number; /** 开盘价 */ - open: number; + open: string; /** 最高价 */ - high: number; + high: string; /** 最低价 */ - low: number; + low: string; /** 收盘价 */ - close: number; + close: string; /** 成交量(base 币种) */ - volume: number; + volume: string; /** 成交额(quote 币种) */ - quoteVolume: number; + quoteVolume: string; /** 主动买入成交量(base 币种) */ - takerBuyBaseVol: number; + takerBuyBaseVol: string; /** 主动买入成交额(quote 币种) */ - takerBuyQuoteVol: number; + takerBuyQuoteVol: string; /** 成交笔数 */ - tradeCount: number; + tradeCount: string; /** 该 K 线是否已关闭(不再更新) */ isClosed: boolean; } @@ -177,24 +152,40 @@ export type ConnectionState = | "error"; // ============================================================ -// 适配器配置 +// REST 客户端配置 // ============================================================ -/** 交易所适配器通用配置 */ -export interface AdapterConfig { +/** REST 客户端通用配置(各交易所 SDK 共用) */ +export interface RestClientConfig { + /** REST API 请求冷却时间(毫秒),默认 200 */ + restRateLimitMs: number; + /** 单次请求默认拉取条数 */ + defaultLimit: number; +} + +/** 默认 REST 客户端配置 */ +export const DEFAULT_REST_CONFIG: RestClientConfig = { + restRateLimitMs: 200, + defaultLimit: 500, +}; + +// ============================================================ +// 适配器配置(含 WebSocket 重连) +// ============================================================ + +/** 交易所适配器完整配置(REST + WebSocket) */ +export interface AdapterConfig extends RestClientConfig { /** 指数退避重连基数(毫秒),默认 3000 */ reconnectBaseDelayMs: number; /** 最大重连次数,默认 10 */ maxReconnectAttempts: number; - /** REST API 请求冷却时间(毫秒),默认 200 */ - restRateLimitMs: number; } /** 默认适配器配置 */ export const DEFAULT_ADAPTER_CONFIG: AdapterConfig = { + ...DEFAULT_REST_CONFIG, reconnectBaseDelayMs: 3000, maxReconnectAttempts: 10, - restRateLimitMs: 200, }; // ============================================================ diff --git a/data/types/index.ts b/data/types/index.ts new file mode 100644 index 0000000..729f04c --- /dev/null +++ b/data/types/index.ts @@ -0,0 +1,3 @@ +export type * from './kline'; +export type * from './base'; +export * from './base'; \ No newline at end of file diff --git a/data/types/kline.ts b/data/types/kline.ts new file mode 100644 index 0000000..ddc4ecf --- /dev/null +++ b/data/types/kline.ts @@ -0,0 +1,10 @@ +/** K 线周期枚举 */ +export type KlineInterval = + | "1m" + | "5m" + | "15m" + | "30m" + | "1h" + | "4h" + | "1d" + | "1w";