diff --git a/engine/data/__init__.py b/engine/data/__init__.py new file mode 100644 index 0000000..26bcee8 --- /dev/null +++ b/engine/data/__init__.py @@ -0,0 +1,5 @@ +# engine.data — K 线数据服务 + +from .service import DataService + +__all__ = ["DataService"] diff --git a/engine/data/service.py b/engine/data/service.py new file mode 100644 index 0000000..63dda8f --- /dev/null +++ b/engine/data/service.py @@ -0,0 +1,340 @@ +""" +K 线数据服务 — 从 TimescaleDB 读取历史 K 线数据,为回测引擎提供数据源 + +用法: + from engine.common.config import config + from engine.data import DataService + + ds = DataService(config.db) + await ds.connect() + + # 获取 BTCUSDT 1h K 线,按时间范围过滤 + klines = await ds.fetch_klines( + symbol="BTCUSDT", + interval="1h", + start_time=datetime(2025, 1, 1), + end_time=datetime(2026, 1, 1), + ) + + await ds.close() +""" + +import asyncio +from datetime import datetime, timezone +from decimal import Decimal +from typing import AsyncGenerator + +import asyncpg + +from ..common.config import DBConfig +from ..common.models import Kline, KlineInterval + +# ── 周期 → 表名映射 ── +INTERVAL_TO_TABLE: dict[KlineInterval, str] = { + "1m": "klines", + "5m": "klines_5m", + "15m": "klines_15m", + "30m": "klines_30m", + "1h": "klines_1h", + "4h": "klines_4h", + "1d": "klines_1d", + "1w": "klines_1w", +} + +# ── 周期毫秒数 ── +INTERVAL_MS: dict[KlineInterval, int] = { + "1m": 60_000, + "5m": 300_000, + "15m": 900_000, + "30m": 1_800_000, + "1h": 3_600_000, + "4h": 14_400_000, + "1d": 86_400_000, + "1w": 604_800_000, +} + +DEFAULT_BATCH_SIZE = 5000 + + +def dt_to_unix_ms(dt: datetime) -> float: + """datetime → Unix 毫秒时间戳""" + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.timestamp() * 1000 + + +def _to_float(val) -> float: + """将 Decimal / int 等数值转为 float""" + if val is None: + return 0.0 + if isinstance(val, Decimal): + return float(val) + return float(val) + + +class DataService: + """K 线数据服务 + + 封装 TimescaleDB 中 K 线数据的查询逻辑, + 将数据库行转换为 engine.common.models.Kline 模型, + 供回测引擎消费。 + """ + + def __init__(self, db_config: DBConfig, pool_size: int = 4): + self._db_config = db_config + self._pool_size = pool_size + self._pool: asyncpg.Pool | None = None + self._col_cache: dict[str, set[str]] = {} + + # ── 生命周期 ── + + @property + def dsn(self) -> str: + db = self._db_config + return f"postgresql://{db.user}:{db.password}@{db.host}:{db.port}/{db.name}" + + async def connect(self) -> None: + """建立数据库连接池""" + self._pool = await asyncpg.create_pool( + dsn=self.dsn, + min_size=1, + max_size=self._pool_size, + ) + + async def close(self) -> None: + """关闭连接池""" + if self._pool: + await self._pool.close() + self._pool = None + + @property + def is_connected(self) -> bool: + return self._pool is not None + + # ── 元数据查询 ── + + async def fetch_available_symbols( + self, interval: KlineInterval = "1m" + ) -> list[str]: + """获取指定周期下所有有数据的交易对""" + table = INTERVAL_TO_TABLE[interval] + async with self._pool.acquire() as conn: + rows = await conn.fetch( + f"SELECT DISTINCT symbol FROM {table} ORDER BY symbol" + ) + return [r["symbol"] for r in rows] + + async def fetch_symbol_date_range( + self, symbol: str, interval: KlineInterval + ) -> tuple[datetime, datetime]: + """获取指定交易对 + 周期的数据起止时间""" + table = INTERVAL_TO_TABLE[interval] + async with self._pool.acquire() as conn: + row = await conn.fetchrow( + f""" + SELECT MIN(time) AS min_time, MAX(time) AS max_time + FROM {table} + WHERE symbol = $1 + """, + symbol, + ) + if row is None or row["min_time"] is None: + raise ValueError(f"无数据: {symbol} {interval}") + return row["min_time"], row["max_time"] + + async def fetch_klines_count( + self, + symbol: str, + interval: KlineInterval, + start_time: datetime | None = None, + end_time: datetime | None = None, + ) -> int: + """获取指定条件的 K 线条数(用于预判数据量)""" + table = INTERVAL_TO_TABLE[interval] + conditions = ["symbol = $1", "interval = $2"] + params: list = [symbol, interval] + idx = 3 + + if start_time is not None: + conditions.append(f"time >= ${idx}") + params.append(start_time) + idx += 1 + if end_time is not None: + conditions.append(f"time < ${idx}") + params.append(end_time) + idx += 1 + + where = " AND ".join(conditions) + async with self._pool.acquire() as conn: + count = await conn.fetchval( + f"SELECT COUNT(*) FROM {table} WHERE {where}", *params + ) + return count + + # ── 核心查询 ── + + async def fetch_klines( + self, + symbol: str, + interval: KlineInterval, + start_time: datetime | None = None, + end_time: datetime | None = None, + limit: int = 1000, + offset: int = 0, + ) -> list[Kline]: + """获取 K 线数据,返回 Pydantic 模型列表 + + Args: + symbol: 交易对(如 BTCUSDT) + interval: K 线周期 + start_time: 起始时间(包含) + end_time: 结束时间(不包含) + limit: 最大返回条数 + offset: 分页偏移 + + Returns: + 按时间升序排列的 Kline 列表 + """ + table = INTERVAL_TO_TABLE[interval] + interval_ms = INTERVAL_MS[interval] + + conditions = ["symbol = $1", "interval = $2"] + params: list = [symbol, interval] + idx = 3 + + if start_time is not None: + conditions.append(f"time >= ${idx}") + params.append(start_time) + idx += 1 + if end_time is not None: + conditions.append(f"time < ${idx}") + params.append(end_time) + idx += 1 + + where = " AND ".join(conditions) + cols = await self._get_columns(table) + + select_cols = [ + "time", "exchange", "symbol", "interval", + "open", "high", "low", "close", "volume", + ] + for extra in ( + "trade_count", "quote_volume", "taker_buy_base_vol", + "taker_buy_quote_vol", "is_closed", + ): + if extra in cols: + select_cols.append(extra) + + async with self._pool.acquire() as conn: + rows = await conn.fetch( + f""" + SELECT {', '.join(select_cols)} + FROM {table} + WHERE {where} + ORDER BY time ASC + LIMIT ${idx} OFFSET ${idx + 1} + """, + *params, + limit, + offset, + ) + + return [self._row_to_kline(r, interval, interval_ms) for r in rows] + + async def stream_klines( + self, + symbol: str, + interval: KlineInterval, + start_time: datetime | None = None, + end_time: datetime | None = None, + batch_size: int = DEFAULT_BATCH_SIZE, + ) -> AsyncGenerator[list[Kline], None]: + """流式获取 K 线数据,适合大数据集 + + 每次返回一批 K 线列表,避免一次性加载过多数据到内存。 + + Yields: + 每批 Kline 列表(按时间升序) + """ + offset = 0 + while True: + batch = await self.fetch_klines( + symbol=symbol, + interval=interval, + start_time=start_time, + end_time=end_time, + limit=batch_size, + offset=offset, + ) + if not batch: + break + yield batch + offset += len(batch) + + async def fetch_multi_klines( + self, + symbols: list[str], + interval: KlineInterval, + start_time: datetime | None = None, + end_time: datetime | None = None, + limit: int = 1000, + ) -> dict[str, list[Kline]]: + """批量获取多个交易对的 K 线 + + Returns: + {symbol: [Kline, ...]} 字典 + """ + tasks = [ + self.fetch_klines( + symbol=sym, + interval=interval, + start_time=start_time, + end_time=end_time, + limit=limit, + ) + for sym in symbols + ] + results = await asyncio.gather(*tasks) + return dict(zip(symbols, results)) + + # ── 内部方法 ── + + async def _get_columns(self, table: str) -> set[str]: + """获取表的列名集合(带缓存)""" + if table not in self._col_cache: + async with self._pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT column_name + FROM information_schema.columns + WHERE table_name = $1 + """, + table, + ) + self._col_cache[table] = {r["column_name"] for r in rows} + return self._col_cache[table] + + @staticmethod + def _row_to_kline( + row: asyncpg.Record, interval: KlineInterval, interval_ms: int + ) -> Kline: + """将数据库行转换为 Kline 模型""" + open_time = dt_to_unix_ms(row["time"]) + + return Kline( + exchange=row["exchange"], + symbol=row["symbol"], + interval=interval, + openTime=open_time, + closeTime=open_time + interval_ms, + open=_to_float(row["open"]), + high=_to_float(row["high"]), + low=_to_float(row["low"]), + close=_to_float(row["close"]), + volume=_to_float(row["volume"]), + quoteVolume=_to_float(row.get("quote_volume", 0)), + takerBuyBaseVol=_to_float(row.get("taker_buy_base_vol", 0)), + takerBuyQuoteVol=_to_float(row.get("taker_buy_quote_vol", 0)), + tradeCount=int(row.get("trade_count") or 0), + isClosed=bool(row.get("is_closed", True)), + ) diff --git a/engine/indicators/__init__.py b/engine/indicators/__init__.py new file mode 100644 index 0000000..322b96c --- /dev/null +++ b/engine/indicators/__init__.py @@ -0,0 +1,30 @@ +""" +技术指标库 + +提供常用的趋势、动量、波动率和成交量指标计算。 +所有函数均为纯 Python 实现,无外部依赖。 + +用法: + from engine.indicators import sma, ema, macd, rsi, bollinger, atr + + closes = [100.0, 101.0, 102.0, ...] + ma = sma(closes, period=20) + rsi_vals = rsi(closes, period=14) + upper, mid, lower = bollinger(closes, period=20, std=2) +""" + +from .trend import sma, ema, macd, macd_signal, macd_histogram, adx +from .momentum import rsi, stoch, stoch_k, stoch_d +from .volatility import bollinger, bollinger_upper, bollinger_mid, bollinger_lower, atr +from .volume import obv, vwap + +__all__ = [ + # 趋势 + "sma", "ema", "macd", "macd_signal", "macd_histogram", "adx", + # 动量 + "rsi", "stoch", "stoch_k", "stoch_d", + # 波动率 + "bollinger", "bollinger_upper", "bollinger_mid", "bollinger_lower", "atr", + # 成交量 + "obv", "vwap", +] diff --git a/engine/indicators/momentum.py b/engine/indicators/momentum.py new file mode 100644 index 0000000..a342a8f --- /dev/null +++ b/engine/indicators/momentum.py @@ -0,0 +1,131 @@ +""" +动量指标 — RSI、Stochastic + +所有函数返回与输入等长的 list[float],不足周期位置填 0.0。 +""" + + +def rsi(data: list[float], period: int = 14) -> list[float]: + """相对强弱指数 (RSI) + + 使用 Wilder 平滑算法,Wilder's RSI = 100 - [100 / (1 + avg_gain / avg_loss)] + + Args: + data: 价格序列 + period: 周期(默认 14) + + Returns: + 与 data 等长的 RSI 序列 [0, 100],前 period 位置为 0 + """ + n = len(data) + result = [0.0] * n + if n < period + 1: + return result + + # 计算价格变化 + changes = [data[i] - data[i - 1] for i in range(1, n)] + + # 初始平均涨幅和跌幅(Simple average of first `period` changes) + gains = [max(c, 0) for c in changes[:period]] + losses = [abs(min(c, 0)) for c in changes[:period]] + avg_gain = sum(gains) / period + avg_loss = sum(losses) / period + + # 计算第一个 RSI + if avg_loss == 0: + result[period] = 100.0 + else: + rs = avg_gain / avg_loss + result[period] = 100.0 - (100.0 / (1.0 + rs)) + + # Wilder 平滑后续值 + for i in range(period, n - 1): + change = changes[i] + gain = max(change, 0.0) + loss = abs(min(change, 0.0)) + + avg_gain = (avg_gain * (period - 1) + gain) / period + avg_loss = (avg_loss * (period - 1) + loss) / period + + if avg_loss == 0: + result[i + 1] = 100.0 + else: + rs = avg_gain / avg_loss + result[i + 1] = 100.0 - (100.0 / (1.0 + rs)) + + return result + + +def stoch( + high: list[float], + low: list[float], + close: list[float], + k_period: int = 14, + k_smooth: int = 3, + d_smooth: int = 3, +): + """Stochastic 指标 (KDJ 中的 K/D) + + %K = 100 * (close - lowest_low) / (highest_high - lowest_low) + %K_smoothed = SMA(%K, k_smooth) + %D = SMA(%K_smoothed, d_smooth) + + Args: + high: 最高价序列 + low: 最低价序列 + close: 收盘价序列 + k_period: %K 窗口 + k_smooth: %K 平滑周期 + d_smooth: %D 平滑周期 + + Returns: + (k_values, d_values) 两个等长序列,范围 [0, 100] + """ + n = len(close) + k_raw = [0.0] * n + k_values = [0.0] * n + d_values = [0.0] * n + + if n < k_period: + return k_values, d_values + + # 计算原始 %K + for i in range(k_period - 1, n): + highest = max(high[i - k_period + 1 : i + 1]) + lowest = min(low[i - k_period + 1 : i + 1]) + if highest != lowest: + k_raw[i] = 100.0 * (close[i] - lowest) / (highest - lowest) + else: + k_raw[i] = 50.0 + + # 平滑 %K + from .trend import sma as _sma + k_smoothed = _sma(k_raw, k_smooth) + d_smoothed = _sma(k_smoothed, d_smooth) + + return k_smoothed, d_smoothed + + +def stoch_k( + high: list[float], + low: list[float], + close: list[float], + k_period: int = 14, + k_smooth: int = 3, +) -> list[float]: + """Stochastic %K""" + k, _ = stoch(high, low, close, k_period, k_smooth) + return k + + +def stoch_d( + high: list[float], + low: list[float], + close: list[float], + k_period: int = 14, + k_smooth: int = 3, + d_smooth: int = 3, +) -> list[float]: + """Stochastic %D""" + _, d = stoch(high, low, close, k_period, k_smooth, d_smooth) + return d diff --git a/engine/indicators/trend.py b/engine/indicators/trend.py new file mode 100644 index 0000000..94f4eaf --- /dev/null +++ b/engine/indicators/trend.py @@ -0,0 +1,191 @@ +""" +趋势指标 — 移动平均线、MACD + +所有函数返回与输入等长的 list[float],不足周期位置填 0.0。 +""" + +from functools import lru_cache + + +def sma(data: list[float], period: int) -> list[float]: + """简单移动平均 (SMA) + + Args: + data: 价格序列 + period: 周期 + + Returns: + 与 data 等长的 SMA 序列,前 period-1 位置为 0 + """ + n = len(data) + result = [0.0] * n + if n < period or period <= 0: + return result + + window_sum = sum(data[:period]) + result[period - 1] = window_sum / period + + for i in range(period, n): + window_sum += data[i] - data[i - period] + result[i] = window_sum / period + + return result + + +def ema(data: list[float], period: int) -> list[float]: + """指数移动平均 (EMA) + + 使用 Wilder 平滑方式:k = 2 / (period + 1) + + Args: + data: 价格序列 + period: 周期 + + Returns: + 与 data 等长的 EMA 序列,前 period-1 位置为 0 + """ + n = len(data) + result = [0.0] * n + if n < period or period <= 0: + return result + + k = 2.0 / (period + 1) + # 初始值使用 SMA + result[period - 1] = sum(data[:period]) / period + + for i in range(period, n): + result[i] = data[i] * k + result[i - 1] * (1 - k) + + return result + + +def macd( + data: list[float], + fast: int = 12, + slow: int = 26, + signal: int = 9, +): + """MACD 指标 + + MACD 线 = EMA(fast) - EMA(slow) + 信号线 = EMA(MACD线, signal) + 柱状图 = MACD 线 - 信号线 + + Args: + data: 价格序列 + fast: 快线周期 + slow: 慢线周期 + signal: 信号线周期 + + Returns: + (macd_line, signal_line, histogram) 三个等长序列 + """ + fast_ema = ema(data, fast) + slow_ema = ema(data, slow) + + macd_line = [0.0] * len(data) + for i in range(len(data)): + macd_line[i] = fast_ema[i] - slow_ema[i] + + signal_line = ema(macd_line, signal) + histogram = [macd_line[i] - signal_line[i] for i in range(len(data))] + + return macd_line, signal_line, histogram + + +def macd_signal( + data: list[float], + fast: int = 12, + slow: int = 26, + signal: int = 9, +) -> list[float]: + """MACD 信号线""" + _, sig, _ = macd(data, fast, slow, signal) + return sig + + +def macd_histogram( + data: list[float], + fast: int = 12, + slow: int = 26, + signal: int = 9, +) -> list[float]: + """MACD 柱状图""" + _, _, hist = macd(data, fast, slow, signal) + return hist + + +def adx( + high: list[float], + low: list[float], + close: list[float], + period: int = 14, +) -> list[float]: + """平均趋向指数 (ADX) + + 判断趋势强度:ADX > 25 表示强趋势,ADX < 20 表示震荡。 + + Args: + high: 最高价序列 + low: 最低价序列 + close: 收盘价序列 + period: 周期(默认 14) + + Returns: + 与输入等长的 ADX 序列 [0, 100],前 2*period 位置为 0 + """ + n = len(close) + result = [0.0] * n + if n < period * 2: + return result + + # True Range, +DM, -DM + tr = [0.0] * n + plus_dm = [0.0] * n + minus_dm = [0.0] * n + + for i in range(1, n): + tr[i] = max( + high[i] - low[i], + abs(high[i] - close[i - 1]), + abs(low[i] - close[i - 1]), + ) + up_move = high[i] - high[i - 1] + down_move = low[i - 1] - low[i] + if up_move > down_move and up_move > 0: + plus_dm[i] = up_move + if down_move > up_move and down_move > 0: + minus_dm[i] = down_move + + # Wilder 平滑 + tr_smooth = [0.0] * n + plus_dm_smooth = [0.0] * n + minus_dm_smooth = [0.0] * n + + tr_smooth[period] = sum(tr[1:period + 1]) + plus_dm_smooth[period] = sum(plus_dm[1:period + 1]) + minus_dm_smooth[period] = sum(minus_dm[1:period + 1]) + + for i in range(period + 1, n): + tr_smooth[i] = tr_smooth[i - 1] - tr_smooth[i - 1] / period + tr[i] + plus_dm_smooth[i] = plus_dm_smooth[i - 1] - plus_dm_smooth[i - 1] / period + plus_dm[i] + minus_dm_smooth[i] = minus_dm_smooth[i - 1] - minus_dm_smooth[i - 1] / period + minus_dm[i] + + # +DI, -DI, DX, ADX + dx = [0.0] * n + for i in range(period, n): + if tr_smooth[i] > 0: + pdi = 100 * plus_dm_smooth[i] / tr_smooth[i] + mdi = 100 * minus_dm_smooth[i] / tr_smooth[i] + di_sum = pdi + mdi + if di_sum > 0: + dx[i] = 100 * abs(pdi - mdi) / di_sum + + # ADX = EMA of DX + for i in range(2 * period, n): + if i == 2 * period: + result[i] = sum(dx[period + 1:2 * period + 1]) / period + else: + result[i] = (result[i - 1] * (period - 1) + dx[i]) / period + + return result diff --git a/engine/indicators/volatility.py b/engine/indicators/volatility.py new file mode 100644 index 0000000..6ef9a14 --- /dev/null +++ b/engine/indicators/volatility.py @@ -0,0 +1,127 @@ +""" +波动率指标 — 布林带、ATR + +所有函数返回与输入等长的 list[float],不足周期位置填 0.0。 +""" + +import math + + +def bollinger( + data: list[float], + period: int = 20, + std: float = 2.0, +): + """布林带 (Bollinger Bands) + + 使用流式计算方差,O(n) 复杂度。 + + Args: + data: 价格序列(通常为收盘价) + period: 中轨 SMA 周期 + std: 标准差倍数 + + Returns: + (upper, mid, lower) 三个等长序列 + """ + n = len(data) + upper = [0.0] * n + mid = [0.0] * n + lower = [0.0] * n + + if n < period: + return upper, mid, lower + + # 初始窗口的 sum 和 sum_sq + window_sum = 0.0 + window_sum_sq = 0.0 + for i in range(period): + v = data[i] + window_sum += v + window_sum_sq += v * v + + # 第一个点 + mean = window_sum / period + mid[period - 1] = mean + variance = (window_sum_sq / period) - (mean * mean) + stdev = math.sqrt(max(variance, 0.0)) + upper[period - 1] = mean + std * stdev + lower[period - 1] = mean - std * stdev + + # 滑动窗口计算后续点 + for i in range(period, n): + old_val = data[i - period] + new_val = data[i] + window_sum += new_val - old_val + window_sum_sq += new_val * new_val - old_val * old_val + + mean = window_sum / period + mid[i] = mean + variance = (window_sum_sq / period) - (mean * mean) + stdev = math.sqrt(max(variance, 0.0)) + upper[i] = mean + std * stdev + lower[i] = mean - std * stdev + + return upper, mid, lower + + +def bollinger_upper(data: list[float], period: int = 20, std: float = 2.0) -> list[float]: + """布林带上轨""" + upper, _, _ = bollinger(data, period, std) + return upper + + +def bollinger_mid(data: list[float], period: int = 20) -> list[float]: + """布林带中轨""" + from .trend import sma as _sma + return _sma(data, period) + + +def bollinger_lower(data: list[float], period: int = 20, std: float = 2.0) -> list[float]: + """布林带下轨""" + _, _, lower = bollinger(data, period, std) + return lower + + +def atr( + high: list[float], + low: list[float], + close: list[float], + period: int = 14, +) -> list[float]: + """平均真实波幅 (ATR) + + 使用 Wilder 平滑算法。 + + Args: + high: 最高价序列 + low: 最低价序列 + close: 收盘价序列 + period: 周期 + + Returns: + 与输入等长的 ATR 序列,前 period 位置为 0 + """ + n = len(close) + result = [0.0] * n + if n < period + 1: + return result + + # 计算 True Range + tr = [0.0] * n + tr[0] = high[0] - low[0] + for i in range(1, n): + tr[i] = max( + high[i] - low[i], + abs(high[i] - close[i - 1]), + abs(low[i] - close[i - 1]), + ) + + # 初始 ATR 为前 period 个 TR 的均值 + result[period] = sum(tr[1:period + 1]) / period + + # Wilder 平滑 + for i in range(period + 1, n): + result[i] = (result[i - 1] * (period - 1) + tr[i]) / period + + return result diff --git a/engine/indicators/volume.py b/engine/indicators/volume.py new file mode 100644 index 0000000..a03257f --- /dev/null +++ b/engine/indicators/volume.py @@ -0,0 +1,75 @@ +""" +成交量指标 — OBV、VWAP + +所有函数返回与输入等长的 list[float],不足周期位置填 0.0。 +""" + + +def obv(close: list[float], volume: list[float]) -> list[float]: + """能量潮 (On-Balance Volume, OBV) + + 从 0 开始累加: + - 收盘价 > 前收盘价:OBV += 成交量 + - 收盘价 < 前收盘价:OBV -= 成交量 + - 收盘价 == 前收盘价:OBV 不变 + + Args: + close: 收盘价序列 + volume: 成交量序列 + + Returns: + 与输入等长的 OBV 序列 + """ + n = len(close) + result = [0.0] * n + if n == 0: + return result + + result[0] = volume[0] + for i in range(1, n): + if close[i] > close[i - 1]: + result[i] = result[i - 1] + volume[i] + elif close[i] < close[i - 1]: + result[i] = result[i - 1] - volume[i] + else: + result[i] = result[i - 1] + + return result + + +def vwap( + high: list[float], + low: list[float], + close: list[float], + volume: list[float], +) -> list[float]: + """成交量加权平均价 (VWAP) + + 累积计算:VWAP = Σ(典型价格 × 成交量) / Σ(成交量) + 典型价格 = (high + low + close) / 3 + + Args: + high: 最高价序列 + low: 最低价序列 + close: 收盘价序列 + volume: 成交量序列 + + Returns: + 与输入等长的 VWAP 序列(从第一个有效 bar 开始累加) + """ + n = len(close) + result = [0.0] * n + if n == 0: + return result + + cum_pv = 0.0 # cumulative price * volume + cum_vol = 0.0 # cumulative volume + + for i in range(n): + typical_price = (high[i] + low[i] + close[i]) / 3.0 + cum_pv += typical_price * volume[i] + cum_vol += volume[i] + if cum_vol > 0: + result[i] = cum_pv / cum_vol + + return result