Files
trade/engine/indicators/trend.py
T
Rekey 212f6fedad feat(engine): 添加数据服务层与技术指标库
- data/service.py: 数据拉取服务,从 TimescaleDB 读取 K 线/Ticker 等行情数据
- indicators/momentum.py: 动量类指标(RSI/MACD/Stochastic 等)
- indicators/trend.py: 趋势类指标(EMA/SMA/ADX/SuperTrend 等)
- indicators/volatility.py: 波动率指标(Bollinger/ATR/Keltner 等)
- indicators/volume.py: 成交量指标(OBV/VWAP/MFI 等)
2026-06-12 10:26:45 +08:00

192 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
趋势指标 — 移动平均线、MACD
所有函数返回与输入等长的 list[float],不足周期位置填 0.0。
"""
from functools import lru_cache
def sma(data: list[float], period: int) -> list[float]:
"""简单移动平均 (SMA)
Args:
data: 价格序列
period: 周期
Returns:
与 data 等长的 SMA 序列,前 period-1 位置为 0
"""
n = len(data)
result = [0.0] * n
if n < period or period <= 0:
return result
window_sum = sum(data[:period])
result[period - 1] = window_sum / period
for i in range(period, n):
window_sum += data[i] - data[i - period]
result[i] = window_sum / period
return result
def ema(data: list[float], period: int) -> list[float]:
"""指数移动平均 (EMA)
使用 Wilder 平滑方式:k = 2 / (period + 1)
Args:
data: 价格序列
period: 周期
Returns:
与 data 等长的 EMA 序列,前 period-1 位置为 0
"""
n = len(data)
result = [0.0] * n
if n < period or period <= 0:
return result
k = 2.0 / (period + 1)
# 初始值使用 SMA
result[period - 1] = sum(data[:period]) / period
for i in range(period, n):
result[i] = data[i] * k + result[i - 1] * (1 - k)
return result
def macd(
data: list[float],
fast: int = 12,
slow: int = 26,
signal: int = 9,
):
"""MACD 指标
MACD 线 = EMA(fast) - EMA(slow)
信号线 = EMA(MACD线, signal)
柱状图 = MACD 线 - 信号线
Args:
data: 价格序列
fast: 快线周期
slow: 慢线周期
signal: 信号线周期
Returns:
(macd_line, signal_line, histogram) 三个等长序列
"""
fast_ema = ema(data, fast)
slow_ema = ema(data, slow)
macd_line = [0.0] * len(data)
for i in range(len(data)):
macd_line[i] = fast_ema[i] - slow_ema[i]
signal_line = ema(macd_line, signal)
histogram = [macd_line[i] - signal_line[i] for i in range(len(data))]
return macd_line, signal_line, histogram
def macd_signal(
data: list[float],
fast: int = 12,
slow: int = 26,
signal: int = 9,
) -> list[float]:
"""MACD 信号线"""
_, sig, _ = macd(data, fast, slow, signal)
return sig
def macd_histogram(
data: list[float],
fast: int = 12,
slow: int = 26,
signal: int = 9,
) -> list[float]:
"""MACD 柱状图"""
_, _, hist = macd(data, fast, slow, signal)
return hist
def adx(
high: list[float],
low: list[float],
close: list[float],
period: int = 14,
) -> list[float]:
"""平均趋向指数 (ADX)
判断趋势强度:ADX > 25 表示强趋势,ADX < 20 表示震荡。
Args:
high: 最高价序列
low: 最低价序列
close: 收盘价序列
period: 周期(默认 14
Returns:
与输入等长的 ADX 序列 [0, 100],前 2*period 位置为 0
"""
n = len(close)
result = [0.0] * n
if n < period * 2:
return result
# True Range, +DM, -DM
tr = [0.0] * n
plus_dm = [0.0] * n
minus_dm = [0.0] * n
for i in range(1, n):
tr[i] = max(
high[i] - low[i],
abs(high[i] - close[i - 1]),
abs(low[i] - close[i - 1]),
)
up_move = high[i] - high[i - 1]
down_move = low[i - 1] - low[i]
if up_move > down_move and up_move > 0:
plus_dm[i] = up_move
if down_move > up_move and down_move > 0:
minus_dm[i] = down_move
# Wilder 平滑
tr_smooth = [0.0] * n
plus_dm_smooth = [0.0] * n
minus_dm_smooth = [0.0] * n
tr_smooth[period] = sum(tr[1:period + 1])
plus_dm_smooth[period] = sum(plus_dm[1:period + 1])
minus_dm_smooth[period] = sum(minus_dm[1:period + 1])
for i in range(period + 1, n):
tr_smooth[i] = tr_smooth[i - 1] - tr_smooth[i - 1] / period + tr[i]
plus_dm_smooth[i] = plus_dm_smooth[i - 1] - plus_dm_smooth[i - 1] / period + plus_dm[i]
minus_dm_smooth[i] = minus_dm_smooth[i - 1] - minus_dm_smooth[i - 1] / period + minus_dm[i]
# +DI, -DI, DX, ADX
dx = [0.0] * n
for i in range(period, n):
if tr_smooth[i] > 0:
pdi = 100 * plus_dm_smooth[i] / tr_smooth[i]
mdi = 100 * minus_dm_smooth[i] / tr_smooth[i]
di_sum = pdi + mdi
if di_sum > 0:
dx[i] = 100 * abs(pdi - mdi) / di_sum
# ADX = EMA of DX
for i in range(2 * period, n):
if i == 2 * period:
result[i] = sum(dx[period + 1:2 * period + 1]) / period
else:
result[i] = (result[i - 1] * (period - 1) + dx[i]) / period
return result