""" 趋势指标 — 移动平均线、MACD 所有函数返回与输入等长的 list[float],不足周期位置填 0.0。 """ from functools import lru_cache def sma(data: list[float], period: int) -> list[float]: """简单移动平均 (SMA) Args: data: 价格序列 period: 周期 Returns: 与 data 等长的 SMA 序列,前 period-1 位置为 0 """ n = len(data) result = [0.0] * n if n < period or period <= 0: return result window_sum = sum(data[:period]) result[period - 1] = window_sum / period for i in range(period, n): window_sum += data[i] - data[i - period] result[i] = window_sum / period return result def ema(data: list[float], period: int) -> list[float]: """指数移动平均 (EMA) 使用 Wilder 平滑方式:k = 2 / (period + 1) Args: data: 价格序列 period: 周期 Returns: 与 data 等长的 EMA 序列,前 period-1 位置为 0 """ n = len(data) result = [0.0] * n if n < period or period <= 0: return result k = 2.0 / (period + 1) # 初始值使用 SMA result[period - 1] = sum(data[:period]) / period for i in range(period, n): result[i] = data[i] * k + result[i - 1] * (1 - k) return result def macd( data: list[float], fast: int = 12, slow: int = 26, signal: int = 9, ): """MACD 指标 MACD 线 = EMA(fast) - EMA(slow) 信号线 = EMA(MACD线, signal) 柱状图 = MACD 线 - 信号线 Args: data: 价格序列 fast: 快线周期 slow: 慢线周期 signal: 信号线周期 Returns: (macd_line, signal_line, histogram) 三个等长序列 """ fast_ema = ema(data, fast) slow_ema = ema(data, slow) macd_line = [0.0] * len(data) for i in range(len(data)): macd_line[i] = fast_ema[i] - slow_ema[i] signal_line = ema(macd_line, signal) histogram = [macd_line[i] - signal_line[i] for i in range(len(data))] return macd_line, signal_line, histogram def macd_signal( data: list[float], fast: int = 12, slow: int = 26, signal: int = 9, ) -> list[float]: """MACD 信号线""" _, sig, _ = macd(data, fast, slow, signal) return sig def macd_histogram( data: list[float], fast: int = 12, slow: int = 26, signal: int = 9, ) -> list[float]: """MACD 柱状图""" _, _, hist = macd(data, fast, slow, signal) return hist def adx( high: list[float], low: list[float], close: list[float], period: int = 14, ) -> list[float]: """平均趋向指数 (ADX) 判断趋势强度:ADX > 25 表示强趋势,ADX < 20 表示震荡。 Args: high: 最高价序列 low: 最低价序列 close: 收盘价序列 period: 周期(默认 14) Returns: 与输入等长的 ADX 序列 [0, 100],前 2*period 位置为 0 """ n = len(close) result = [0.0] * n if n < period * 2: return result # True Range, +DM, -DM tr = [0.0] * n plus_dm = [0.0] * n minus_dm = [0.0] * n for i in range(1, n): tr[i] = max( high[i] - low[i], abs(high[i] - close[i - 1]), abs(low[i] - close[i - 1]), ) up_move = high[i] - high[i - 1] down_move = low[i - 1] - low[i] if up_move > down_move and up_move > 0: plus_dm[i] = up_move if down_move > up_move and down_move > 0: minus_dm[i] = down_move # Wilder 平滑 tr_smooth = [0.0] * n plus_dm_smooth = [0.0] * n minus_dm_smooth = [0.0] * n tr_smooth[period] = sum(tr[1:period + 1]) plus_dm_smooth[period] = sum(plus_dm[1:period + 1]) minus_dm_smooth[period] = sum(minus_dm[1:period + 1]) for i in range(period + 1, n): tr_smooth[i] = tr_smooth[i - 1] - tr_smooth[i - 1] / period + tr[i] plus_dm_smooth[i] = plus_dm_smooth[i - 1] - plus_dm_smooth[i - 1] / period + plus_dm[i] minus_dm_smooth[i] = minus_dm_smooth[i - 1] - minus_dm_smooth[i - 1] / period + minus_dm[i] # +DI, -DI, DX, ADX dx = [0.0] * n for i in range(period, n): if tr_smooth[i] > 0: pdi = 100 * plus_dm_smooth[i] / tr_smooth[i] mdi = 100 * minus_dm_smooth[i] / tr_smooth[i] di_sum = pdi + mdi if di_sum > 0: dx[i] = 100 * abs(pdi - mdi) / di_sum # ADX = EMA of DX for i in range(2 * period, n): if i == 2 * period: result[i] = sum(dx[period + 1:2 * period + 1]) / period else: result[i] = (result[i - 1] * (period - 1) + dx[i]) / period return result