Files
trade/engine/example/data.py
T
Rekey 515e61c517 feat(engine): 添加策略示例集(18 个 Demo)
- backtest_demo.py: 回测基础演示
- strategy_simple.py / three_ema.py / long_short.py: 基础策略(双均线/三均线/多空)
- strategy_optimize*.py (3 版本): 参数优化示例(网格搜索/贝叶斯/遗传算法)
- multi_tf_*.py (4 版本): 多时间框架策略(EMA200/多周期共振/混合信号)
- regime_*.py (4 版本): 市场状态检测(趋势/震荡/波动率区间/全状态)
- cross_section.py: 截面多品种策略
- factor_demo.py: 多因子模型演示
- strategy_battle.py / strategy_more.py: 策略对比与组合
- full_cycle.py: 全流程演示(数据→回测→分析)
- data.py: 数据读取示例
2026-06-12 10:27:04 +08:00

63 lines
1.7 KiB
Python

"""
DataService 使用示例 — 读取各周期 K 线并打印
用法:
python example/data.py
"""
import asyncio
import sys
from pathlib import Path
# 确保项目根目录在 sys.path 中,以便使用 engine 包导入
_project_root = Path(__file__).resolve().parent.parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root))
from engine.data import DataService
from engine.common.config import config
async def main():
print(config.db)
ds = DataService(config.db)
await ds.connect()
try:
# 1. 查看可用交易对
symbols = await ds.fetch_available_symbols("1m")
print(f"可用交易对: {symbols}")
# 2. 各周期读取最新 3 条 BTCUSDT K 线
target = "BTCUSDT"
for interval in ["1m", "5m", "15m", "30m", "1h", "4h", "1d"]:
klines = await ds.fetch_klines(
symbol=target,
interval=interval,
limit=3,
)
print(f"\n{'' * 70}")
print(f" [{interval}] {target}{len(klines)}")
print(f"{'' * 70}")
for k in klines:
print(
f" {k.open_time:.0f} O={k.open:>12.4f} H={k.high:>12.4f}"
f" L={k.low:>12.4f} C={k.close:>12.4f}"
f" V={k.volume:>10.4f} trades={k.trade_count}"
)
# 3. 日期范围
start, end = await ds.fetch_symbol_date_range(target, "1d")
print(f"\n{target} 1d 数据范围: {start} ~ {end}")
print("\n完成")
finally:
await ds.close()
if __name__ == "__main__":
asyncio.run(main())