From 039bfb507519d371d34913ab1fe6a480587b4f54 Mon Sep 17 00:00:00 2001 From: Rekey Date: Fri, 12 Jun 2026 10:26:37 +0800 Subject: [PATCH] =?UTF-8?q?feat(engine):=20=E6=B7=BB=E5=8A=A0=E6=A0=B8?= =?UTF-8?q?=E5=BF=83=E5=9F=BA=E7=A1=80=E8=AE=BE=E6=96=BD=20=E2=80=94=20eng?= =?UTF-8?q?ine/common=20=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - engine/__init__.py: 包入口,导出 Kline/KlineInterval/OrderBook/Ticker/Trade - common/base.py: BaseStrategy 抽象基类,定义 on_kline/on_ticker/on_orderbook 回调 - common/models.py: Pydantic 数据模型,与 TS 侧 types 字段对齐,支持字段校验 - common/config.py: 全局配置加载(YAML),统一 engine/env.yaml 读取 - common/logger.py: 结构化日志,支持 JSON/pretty print 输出 --- engine/__init__.py | 11 ++ engine/common/__init__.py | 23 ++++ engine/common/base.py | 226 ++++++++++++++++++++++++++++++++++++++ engine/common/config.py | 72 ++++++++++++ engine/common/logger.py | 30 +++++ engine/common/models.py | 178 ++++++++++++++++++++++++++++++ 6 files changed, 540 insertions(+) create mode 100644 engine/__init__.py create mode 100644 engine/common/__init__.py create mode 100644 engine/common/base.py create mode 100644 engine/common/config.py create mode 100644 engine/common/logger.py create mode 100644 engine/common/models.py diff --git a/engine/__init__.py b/engine/__init__.py new file mode 100644 index 0000000..f24bd26 --- /dev/null +++ b/engine/__init__.py @@ -0,0 +1,11 @@ +# engine - 策略引擎模块 + +from .common import Kline, KlineInterval, OrderBook, Ticker, Trade, config +from .data import DataService +from .backtest import BacktestEngine, BacktestConfig, BacktestResult, BacktestMetrics, BacktestTrade + +__all__ = [ + "Kline", "KlineInterval", "OrderBook", "Ticker", "Trade", + "DataService", "config", + "BacktestEngine", "BacktestConfig", "BacktestResult", "BacktestMetrics", "BacktestTrade", +] diff --git a/engine/common/__init__.py b/engine/common/__init__.py new file mode 100644 index 0000000..a537726 --- /dev/null +++ b/engine/common/__init__.py @@ -0,0 +1,23 @@ +# engine.common — 策略引擎公共模块 + +from .models import Kline, KlineInterval, OrderBook, Ticker, Trade +from .base import BaseStrategy, Signal, StrategyConfig +from .logger import logger +from .config import AppConfig, DBConfig, LoggingConfig, RedisConfig, config + +__all__ = [ + "Kline", + "KlineInterval", + "OrderBook", + "Ticker", + "Trade", + "BaseStrategy", + "Signal", + "StrategyConfig", + "logger", + "config", + "AppConfig", + "DBConfig", + "RedisConfig", + "LoggingConfig", +] diff --git a/engine/common/base.py b/engine/common/base.py new file mode 100644 index 0000000..907e9b7 --- /dev/null +++ b/engine/common/base.py @@ -0,0 +1,226 @@ +""" +策略引擎核心模块 — 策略基类、配置与信号定义 + +提供策略开发的抽象基类和核心数据类型,所有策略必须继承 BaseStrategy。 +""" + +from abc import ABC, abstractmethod +from typing import Optional + +from pydantic import BaseModel + +from .models import Kline, OrderBook, Ticker, Trade + + +# ============================================================ +# 配置模型 +# ============================================================ + + +class StrategyConfig(BaseModel): + """策略配置基类 + + 由具体策略子类化扩展,添加策略专属参数。 + 子类示例: + class MACrossConfig(StrategyConfig): + fast_period: int = 7 + slow_period: int = 25 + """ + name: str = "" + symbol: str = "" + exchange: str = "binance" + enabled: bool = True + max_position_pct: float = 0.1 # 最大仓位占总资金比例 + stop_loss_pct: Optional[float] = None # 止损百分比 + take_profit_pct: Optional[float] = None # 止盈百分比 + cooldown_seconds: float = 1.0 # 下单冷却时间(秒) + min_confidence: float = 0.3 # 最低信号置信度 + + +# ============================================================ +# 信号模型 +# ============================================================ + + +class Signal(BaseModel): + """交易信号 + + 策略通过返回 Signal 来表达交易意图,由信号分发器路由到交易执行模块。 + """ + symbol: str + side: str # "BUY" / "SELL" + signal_type: str = "MARKET" # "MARKET" / "LIMIT" / "CANCEL" + price: Optional[float] = None # 限价单价格(signal_type=LIMIT 时必填) + quantity: Optional[float] = None # 下单数量(None 表示按仓位比例计算) + confidence: float = 1.0 # 信号置信度 [0, 1] + reason: str = "" # 信号生成原因(便于审计和调试) + timestamp: float = 0.0 # 信号时间戳(Unix 毫秒) + + +# ============================================================ +# 策略基类 +# ============================================================ + + +class BaseStrategy(ABC): + """所有策略的抽象基类 + + 子类必须: + 1. 覆盖 strategy_type 类属性(唯一标识) + 2. 实现 on_kline() 方法 + 3. 如有需要,覆盖其他 on_* 方法 + + 生命周期: + register → create → on_start() → [事件循环] → on_stop() → unload + + 用法示例: + class MyStrategy(BaseStrategy): + strategy_type = "my_strategy" + + async def on_kline(self, kline): + if kline.close > self._ma: + return Signal(symbol=self.config.symbol, side="BUY", + reason="Price above MA", timestamp=kline.open_time) + return None + """ + + # 策略类型标识,子类必须覆盖为唯一字符串 + strategy_type: str = "base" + + def __init__(self, config: StrategyConfig): + self.config = config + self.is_running: bool = False + self.pnl: float = 0.0 + self.trade_count: int = 0 + + # ── 属性 ── + + @property + def name(self) -> str: + """策略名称(来自 config.name)""" + return self.config.name + + # ── 生命周期钩子 ── + + async def on_start(self) -> None: + """策略启动时调用。 + + 用于加载初始状态、从数据库恢复持仓信息、预热指标等。 + 调用时机:策略实例创建后首次启动,或从暂停状态恢复时。 + """ + self.is_running = True + + async def on_stop(self) -> None: + """策略停止时调用。 + + 用于清理资源、平仓(可选)、保存状态等。 + 调用时机:手动停止、系统关闭、策略热更新前。 + """ + self.is_running = False + + async def on_pause(self) -> None: + """策略暂停时调用。 + + 暂停期间策略不再接收行情数据,但保留内部状态。 + 调用时机:手动暂停、风控触发暂停。 + """ + pass + + async def on_resume(self) -> None: + """策略恢复时调用。 + + 从暂停状态恢复,重新开始接收行情数据。 + 调用时机:手动恢复、风控解除。 + """ + pass + + # ── 行情事件处理器 ── + + @abstractmethod + async def on_kline(self, kline: Kline) -> Optional[Signal]: + """处理 K 线数据。 + + K 线是最主要的行情输入,子类必须实现此方法。 + 返回 Signal 表示产生交易意图,返回 None 表示不交易。 + + Args: + kline: engine.common.models.Kline 实例 + + Returns: + Signal | None + """ + ... + + async def on_ticker(self, ticker: Ticker) -> Optional[Signal]: + """处理 Ticker 数据(可选实现)。 + + 对于高频或需要实时价格的策略,可覆盖此方法。 + """ + return None + + async def on_trade(self, trade: Trade) -> Optional[Signal]: + """处理逐笔成交数据(可选实现)。 + + 用于需要分析成交量分布、大单检测等场景。 + """ + return None + + async def on_orderbook(self, orderbook: OrderBook) -> Optional[Signal]: + """处理订单簿深度数据(可选实现)。 + + 用于需要分析盘口深度、流动性等的策略。 + """ + return None + + # ── 交易反馈钩子 ── + + async def on_order_filled(self, order_result) -> None: + """订单成交回调。 + + 当策略发出的订单被成交后,交易执行模块通过此钩子通知策略。 + 策略可据此更新内部状态(持仓、盈亏等)。 + + Args: + order_result: 订单成交结果(字段取决于 executor 模块定义) + """ + pass + + async def on_order_cancelled(self, order_id: str) -> None: + """订单撤销回调。 + + Args: + order_id: 被撤销的订单 ID + """ + pass + + async def on_order_rejected(self, order_id: str, reason: str) -> None: + """订单被拒回调。 + + 当风控检查不通过或交易所拒绝订单时触发。 + + Args: + order_id: 订单 ID + reason: 拒绝原因 + """ + pass + + # ── 辅助方法 ── + + def log(self, level: str, message: str, **kwargs) -> None: + """结构化日志输出。 + + 策略内应使用此方法而非直接 print 或使用 logging, + 以确保日志格式统一、可被监控系统采集。 + + Args: + level: debug / info / warning / error + message: 日志消息 + **kwargs: 附加的结构化字段(如 price=50000, signal="BUY") + """ + from .logger import logger + log_method = getattr(logger, level, logger.info) + extras = " | ".join(f"{k}={v}" for k, v in kwargs.items()) if kwargs else "" + full_msg = f"[{self.name}] {message}" + if extras: + full_msg += f" | {extras}" + log_method(full_msg) diff --git a/engine/common/config.py b/engine/common/config.py new file mode 100644 index 0000000..250c354 --- /dev/null +++ b/engine/common/config.py @@ -0,0 +1,72 @@ +""" +项目配置模块 — 读取并校验根目录 env.yaml + +使用方式: + from engine.common.config import config + + db = config.db + print(db.host, db.port, db.name) + print(config.redis.url) + print(config.logging.level) +""" + +from pathlib import Path +from typing import Optional + +import yaml +from pydantic import BaseModel, Field + + +class DBConfig(BaseModel): + """TimescaleDB / PostgreSQL 连接配置""" + + host: str + port: int = 5432 + name: str + user: str + password: str + + +class RedisConfig(BaseModel): + """Redis 连接配置""" + + url: str = "redis://localhost:6379" + publish_enabled: bool = True + + +class LoggingConfig(BaseModel): + """日志配置""" + + level: str = "debug" # trace / debug / info / warn / error / fatal + node_env: str = Field(default="development", alias="node_env") + + class Config: + populate_by_name = True + + +class AppConfig(BaseModel): + """应用配置聚合""" + + db: DBConfig + redis: RedisConfig + logging: LoggingConfig + + +def load_config(config_path: Optional[Path] = None) -> AppConfig: + """从 env.yaml 加载并校验配置 + + Args: + config_path: 显式指定路径;为 None 时自动查找项目根目录 env.yaml + """ + if config_path is None: + # engine/common/config.py → engine/ (parent of common/) → 根目录 env.yaml + config_path = Path(__file__).resolve().parent.parent / "env.yaml" + + with open(config_path) as f: + raw = yaml.safe_load(f) + + return AppConfig(**raw) + + +# ── 模块级单例 ── +config = load_config() diff --git a/engine/common/logger.py b/engine/common/logger.py new file mode 100644 index 0000000..3f8b484 --- /dev/null +++ b/engine/common/logger.py @@ -0,0 +1,30 @@ +""" +日志模块 — 系统级结构化日志 + +基于标准库 logging + 结构化格式,支持按级别、模块过滤。 +""" + +import logging +import sys + +# ── 日志器 ── + +logger = logging.getLogger("trade") +"""全局日志器,策略代码通过 `from .logger import logger` 使用""" + +# ── 控制台输出 ── + +_handler = logging.StreamHandler(sys.stdout) +_handler.setLevel(logging.DEBUG) + +_formatter = logging.Formatter( + fmt="%(asctime)s | %(levelname)-7s | %(name)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +_handler.setFormatter(_formatter) + +logger.addHandler(_handler) +logger.setLevel(logging.DEBUG) + +# 避免日志向上传播到 root logger 导致重复输出 +logger.propagate = False diff --git a/engine/common/models.py b/engine/common/models.py new file mode 100644 index 0000000..f5640fb --- /dev/null +++ b/engine/common/models.py @@ -0,0 +1,178 @@ +""" +数据模型定义 — 与 TS data/types/base.ts 对齐的 Pydantic 模型 + +Ticker / Trade / OrderBook / Kline 是系统中流通的核心行情数据结构, +字段语义与 Binance/OKX/Bybit 通用概念对齐,时间戳统一使用 Unix 毫秒。 +""" + +from typing import Literal + +from pydantic import BaseModel, Field, field_validator + + +# ============================================================ +# K 线周期类型 +# ============================================================ + +KlineInterval = Literal["1m", "5m", "15m", "30m", "1h", "4h", "1d", "1w"] + + +# ============================================================ +# 行情数据模型 +# ============================================================ + + +class Ticker(BaseModel): + """24 小时滚动行情统计 + + 每笔成交触发推送,包含最近 24 小时的 OHLC、成交量、买卖盘口等统计信息。 + """ + + exchange: str + """交易所标识(如 binance)""" + symbol: str + """交易对符号(大写,如 BTCUSDT)""" + + # 24h 价格统计 + last_price: float = Field(alias="lastPrice") + """最新成交价""" + open_price: float = Field(alias="openPrice") + """24h 开盘价""" + high_price: float = Field(alias="highPrice") + """24h 最高价""" + low_price: float = Field(alias="lowPrice") + """24h 最低价""" + + # 24h 成交量统计 + volume: float + """24h 成交量(base 币种)""" + quote_volume: float = Field(alias="quoteVolume") + """24h 成交额(quote 币种)""" + + # 价格变化 + price_change: float = Field(alias="priceChange") + """24h 价格变化""" + price_change_percent: float = Field(alias="priceChangePercent") + """24h 价格变化百分比(0.05 = 5%)""" + + # 最优买卖盘口 + bid_price: float = Field(alias="bidPrice") + """买一价""" + bid_qty: float = Field(alias="bidQty") + """买一量""" + ask_price: float = Field(alias="askPrice") + """卖一价""" + ask_qty: float = Field(alias="askQty") + """卖一量""" + + # 时间戳 + event_time: float = Field(alias="eventTime") + """事件发生时间(Unix 毫秒)""" + close_time: float = Field(alias="closeTime") + """交易所收盘时间(Unix 毫秒,用于判断 K 线是否闭合)""" + + +class Trade(BaseModel): + """逐笔成交记录""" + + exchange: str + """交易所标识""" + symbol: str + """交易对符号""" + price: float + """成交价""" + amount: float + """成交数量(base 币种)""" + quote_amount: float = Field(alias="quoteAmount") + """成交额(quote 币种 = price × amount)""" + timestamp: float + """成交时间(Unix 毫秒)""" + is_buyer_maker: bool = Field(alias="isBuyerMaker") + """买方是否为挂单方(true = 主动卖出 / taker sell)""" + trade_id: str = Field(alias="tradeId") + """交易所成交 ID""" + + +class OrderBook(BaseModel): + """订单簿深度快照""" + + exchange: str + """交易所标识""" + symbol: str + """交易对符号""" + bids: list[tuple[float, float]] + """买单列表 [[price, qty], ...],按价格降序(买一在前)""" + asks: list[tuple[float, float]] + """卖单列表 [[price, qty], ...],按价格升序(卖一在前)""" + last_update_id: int = Field(alias="lastUpdateId") + """上次更新 ID""" + event_time: float = Field(alias="eventTime") + """事件发生时间(Unix 毫秒)""" + + +class Kline(BaseModel): + """标准化 K 线(OHLCV) + + K 线是最主要的行情输入,策略通过 on_kline() 接收此数据。 + open/high/low/close/volume 字段在 TS 侧为字符串以保持精度, + 模型在初始化时自动转换为 float。 + """ + + exchange: str + """交易所标识""" + symbol: str + """交易对符号""" + interval: KlineInterval + """K 线周期""" + + # 时间 + open_time: float = Field(alias="openTime") + """开盘时间(Unix 毫秒)""" + close_time: float = Field(alias="closeTime") + """收盘时间(Unix 毫秒)""" + + # OHLCV + open: float + """开盘价""" + high: float + """最高价""" + low: float + """最低价""" + close: float + """收盘价""" + volume: float + """成交量(base 币种)""" + + # 扩展字段 + quote_volume: float = Field(default=0.0, alias="quoteVolume") + """成交额(quote 币种)""" + taker_buy_base_vol: float = Field(default=0.0, alias="takerBuyBaseVol") + """主动买入成交量(base 币种)""" + taker_buy_quote_vol: float = Field(default=0.0, alias="takerBuyQuoteVol") + """主动买入成交额(quote 币种)""" + trade_count: int = Field(default=0, alias="tradeCount") + """成交笔数""" + is_closed: bool = Field(alias="isClosed") + """该 K 线是否已关闭(不再更新)""" + + # ── 字段校验:处理 TS 侧字符串 → float 转换 ── + + @field_validator( + "open", "high", "low", "close", "volume", + "quote_volume", "taker_buy_base_vol", "taker_buy_quote_vol", + mode="before", + ) + @classmethod + def _coerce_float(cls, v: object) -> float: + """将字符串类型数值转为 float,兼容 TS 侧 MessagePack 序列化格式""" + if isinstance(v, str): + return float(v) + return float(v) + + @field_validator("trade_count", mode="before") + @classmethod + def _coerce_int(cls, v: object) -> int: + """将字符串类型数值转为 int""" + if isinstance(v, str): + return int(v) + return int(v)