Files
Rekey 039bfb5075 feat(engine): 添加核心基础设施 — engine/common 模块
- engine/__init__.py: 包入口,导出 Kline/KlineInterval/OrderBook/Ticker/Trade
- common/base.py: BaseStrategy 抽象基类,定义 on_kline/on_ticker/on_orderbook 回调
- common/models.py: Pydantic 数据模型,与 TS 侧 types 字段对齐,支持字段校验
- common/config.py: 全局配置加载(YAML),统一 engine/env.yaml 读取
- common/logger.py: 结构化日志,支持 JSON/pretty print 输出
2026-06-12 10:26:37 +08:00

227 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
策略引擎核心模块 — 策略基类、配置与信号定义
提供策略开发的抽象基类和核心数据类型,所有策略必须继承 BaseStrategy。
"""
from abc import ABC, abstractmethod
from typing import Optional
from pydantic import BaseModel
from .models import Kline, OrderBook, Ticker, Trade
# ============================================================
# 配置模型
# ============================================================
class StrategyConfig(BaseModel):
"""策略配置基类
由具体策略子类化扩展,添加策略专属参数。
子类示例:
class MACrossConfig(StrategyConfig):
fast_period: int = 7
slow_period: int = 25
"""
name: str = ""
symbol: str = ""
exchange: str = "binance"
enabled: bool = True
max_position_pct: float = 0.1 # 最大仓位占总资金比例
stop_loss_pct: Optional[float] = None # 止损百分比
take_profit_pct: Optional[float] = None # 止盈百分比
cooldown_seconds: float = 1.0 # 下单冷却时间(秒)
min_confidence: float = 0.3 # 最低信号置信度
# ============================================================
# 信号模型
# ============================================================
class Signal(BaseModel):
"""交易信号
策略通过返回 Signal 来表达交易意图,由信号分发器路由到交易执行模块。
"""
symbol: str
side: str # "BUY" / "SELL"
signal_type: str = "MARKET" # "MARKET" / "LIMIT" / "CANCEL"
price: Optional[float] = None # 限价单价格(signal_type=LIMIT 时必填)
quantity: Optional[float] = None # 下单数量(None 表示按仓位比例计算)
confidence: float = 1.0 # 信号置信度 [0, 1]
reason: str = "" # 信号生成原因(便于审计和调试)
timestamp: float = 0.0 # 信号时间戳(Unix 毫秒)
# ============================================================
# 策略基类
# ============================================================
class BaseStrategy(ABC):
"""所有策略的抽象基类
子类必须:
1. 覆盖 strategy_type 类属性(唯一标识)
2. 实现 on_kline() 方法
3. 如有需要,覆盖其他 on_* 方法
生命周期:
register → create → on_start() → [事件循环] → on_stop() → unload
用法示例:
class MyStrategy(BaseStrategy):
strategy_type = "my_strategy"
async def on_kline(self, kline):
if kline.close > self._ma:
return Signal(symbol=self.config.symbol, side="BUY",
reason="Price above MA", timestamp=kline.open_time)
return None
"""
# 策略类型标识,子类必须覆盖为唯一字符串
strategy_type: str = "base"
def __init__(self, config: StrategyConfig):
self.config = config
self.is_running: bool = False
self.pnl: float = 0.0
self.trade_count: int = 0
# ── 属性 ──
@property
def name(self) -> str:
"""策略名称(来自 config.name"""
return self.config.name
# ── 生命周期钩子 ──
async def on_start(self) -> None:
"""策略启动时调用。
用于加载初始状态、从数据库恢复持仓信息、预热指标等。
调用时机:策略实例创建后首次启动,或从暂停状态恢复时。
"""
self.is_running = True
async def on_stop(self) -> None:
"""策略停止时调用。
用于清理资源、平仓(可选)、保存状态等。
调用时机:手动停止、系统关闭、策略热更新前。
"""
self.is_running = False
async def on_pause(self) -> None:
"""策略暂停时调用。
暂停期间策略不再接收行情数据,但保留内部状态。
调用时机:手动暂停、风控触发暂停。
"""
pass
async def on_resume(self) -> None:
"""策略恢复时调用。
从暂停状态恢复,重新开始接收行情数据。
调用时机:手动恢复、风控解除。
"""
pass
# ── 行情事件处理器 ──
@abstractmethod
async def on_kline(self, kline: Kline) -> Optional[Signal]:
"""处理 K 线数据。
K 线是最主要的行情输入,子类必须实现此方法。
返回 Signal 表示产生交易意图,返回 None 表示不交易。
Args:
kline: engine.common.models.Kline 实例
Returns:
Signal | None
"""
...
async def on_ticker(self, ticker: Ticker) -> Optional[Signal]:
"""处理 Ticker 数据(可选实现)。
对于高频或需要实时价格的策略,可覆盖此方法。
"""
return None
async def on_trade(self, trade: Trade) -> Optional[Signal]:
"""处理逐笔成交数据(可选实现)。
用于需要分析成交量分布、大单检测等场景。
"""
return None
async def on_orderbook(self, orderbook: OrderBook) -> Optional[Signal]:
"""处理订单簿深度数据(可选实现)。
用于需要分析盘口深度、流动性等的策略。
"""
return None
# ── 交易反馈钩子 ──
async def on_order_filled(self, order_result) -> None:
"""订单成交回调。
当策略发出的订单被成交后,交易执行模块通过此钩子通知策略。
策略可据此更新内部状态(持仓、盈亏等)。
Args:
order_result: 订单成交结果(字段取决于 executor 模块定义)
"""
pass
async def on_order_cancelled(self, order_id: str) -> None:
"""订单撤销回调。
Args:
order_id: 被撤销的订单 ID
"""
pass
async def on_order_rejected(self, order_id: str, reason: str) -> None:
"""订单被拒回调。
当风控检查不通过或交易所拒绝订单时触发。
Args:
order_id: 订单 ID
reason: 拒绝原因
"""
pass
# ── 辅助方法 ──
def log(self, level: str, message: str, **kwargs) -> None:
"""结构化日志输出。
策略内应使用此方法而非直接 print 或使用 logging
以确保日志格式统一、可被监控系统采集。
Args:
level: debug / info / warning / error
message: 日志消息
**kwargs: 附加的结构化字段(如 price=50000, signal="BUY"
"""
from .logger import logger
log_method = getattr(logger, level, logger.info)
extras = " | ".join(f"{k}={v}" for k, v in kwargs.items()) if kwargs else ""
full_msg = f"[{self.name}] {message}"
if extras:
full_msg += f" | {extras}"
log_method(full_msg)