""" 增量指标 — O(1) 每 bar 更新,避免每次从头重算整条序列 策略在 on_kline 中对每根 bar 调用 update(),内部只计算增量值, 对外暴露 values 属性(完整序列,支持索引回溯),兼顾性能与易用性。 用法: from engine.indicators.incremental import EmaInc, AtrInc e200 = EmaInc(200) for price in prices: e200.update(price) print(e200[-1]) # 最新 EMA 值 print(e200[-20]) # 20 根前的 EMA 值(斜率计算用) """ from typing import Optional class EmaInc: """增量 EMA 内部维护完整序列,update() 为 O(1),values 为 list[float] 可直接索引。 """ def __init__(self, period: int): self.period = period self.k = 2.0 / (period + 1) self._values: list[float] = [] self._warm: list[float] = [] self._ready = False def update(self, price: float) -> float: """输入新价格,返回最新 EMA 值(不足周期时返回 0)""" if not self._ready: self._warm.append(price) self._values.append(0.0) if len(self._warm) == self.period: self._values[-1] = sum(self._warm) / self.period self._warm.clear() self._ready = True return self._values[-1] return 0.0 val = price * self.k + self._values[-1] * (1 - self.k) self._values.append(val) return val @property def values(self) -> list[float]: return self._values @property def current(self) -> float: return self._values[-1] if self._values else 0.0 def __getitem__(self, idx: int) -> float: return self._values[idx] def __len__(self) -> int: return len(self._values) class AtrInc: """增量 ATR(Wilder 平滑) 内部维护完整序列,update() 为 O(1),values 为 list[float] 可直接索引。 """ def __init__(self, period: int = 14): self.period = period self._values: list[float] = [] self._tr_buffer: list[float] = [] self._prev_close: Optional[float] = None self._ready = False def update(self, high: float, low: float, close: float) -> float: """输入新 bar 的 HLC,返回最新 ATR 值(不足周期时返回 0)""" # 第一根 bar:记录收盘价,无法计算 TR if self._prev_close is None: self._prev_close = close self._values.append(0.0) return 0.0 tr = max(high - low, abs(high - self._prev_close), abs(low - self._prev_close)) self._prev_close = close if not self._ready: self._tr_buffer.append(tr) self._values.append(0.0) if len(self._tr_buffer) == self.period: atr_val = sum(self._tr_buffer) / self.period self._values[-1] = atr_val self._tr_buffer.clear() self._ready = True return atr_val return 0.0 # Wilder 平滑 atr_val = (self._values[-1] * (self.period - 1) + tr) / self.period self._values.append(atr_val) return atr_val @property def values(self) -> list[float]: return self._values @property def current(self) -> float: return self._values[-1] if self._values else 0.0 def __getitem__(self, idx: int) -> float: return self._values[idx] def __len__(self) -> int: return len(self._values) class RsiInc: """增量 RSI(Wilder 平滑) 内部维护完整序列,update() 为 O(1)。 """ def __init__(self, period: int = 14): self.period = period self._values: list[float] = [] self._prev_price: Optional[float] = None self._avg_gain: float = 0.0 self._avg_loss: float = 0.0 self._changes: list[float] = [] self._ready = False def update(self, price: float) -> float: if self._prev_price is None: self._prev_price = price self._values.append(0.0) return 0.0 change = price - self._prev_price self._prev_price = price if not self._ready: self._changes.append(change) self._values.append(0.0) if len(self._changes) == self.period: gains = [max(c, 0.0) for c in self._changes] losses = [abs(min(c, 0.0)) for c in self._changes] self._avg_gain = sum(gains) / self.period self._avg_loss = sum(losses) / self.period self._changes.clear() self._ready = True rs = self._avg_gain / self._avg_loss if self._avg_loss > 0 else float("inf") rsi = 100.0 - (100.0 / (1.0 + rs)) if self._avg_loss > 0 else 100.0 self._values[-1] = rsi return rsi return 0.0 gain = max(change, 0.0) loss = abs(min(change, 0.0)) self._avg_gain = (self._avg_gain * (self.period - 1) + gain) / self.period self._avg_loss = (self._avg_loss * (self.period - 1) + loss) / self.period if self._avg_loss == 0: rsi = 100.0 else: rs = self._avg_gain / self._avg_loss rsi = 100.0 - (100.0 / (1.0 + rs)) self._values.append(rsi) return rsi @property def values(self) -> list[float]: return self._values @property def current(self) -> float: return self._values[-1] if self._values else 0.0 def __getitem__(self, idx: int) -> float: return self._values[idx] def __len__(self) -> int: return len(self._values) class BbInc: """增量布林带 内部维护完整序列,update() 返回 (upper, mid, lower) 三元组。 """ def __init__(self, period: int = 20, std: float = 2.0): self.period = period self.std = std self._upper: list[float] = [] self._mid: list[float] = [] self._lower: list[float] = [] self._window: list[float] = [] self._window_sum: float = 0.0 self._window_sum_sq: float = 0.0 def update(self, price: float) -> tuple[float, float, float]: self._window.append(price) self._window_sum += price self._window_sum_sq += price * price if len(self._window) < self.period: self._upper.append(0.0) self._mid.append(0.0) self._lower.append(0.0) return 0.0, 0.0, 0.0 if len(self._window) > self.period: old = self._window.pop(0) self._window_sum -= old self._window_sum_sq -= old * old mean = self._window_sum / self.period variance = (self._window_sum_sq / self.period) - (mean * mean) stdev = max(variance, 0.0) ** 0.5 upper = mean + self.std * stdev lower = mean - self.std * stdev self._upper.append(upper) self._mid.append(mean) self._lower.append(lower) return upper, mean, lower @property def upper(self) -> list[float]: return self._upper @property def mid(self) -> list[float]: return self._mid @property def lower(self) -> list[float]: return self._lower def __len__(self) -> int: return len(self._mid)