import logging import pandas as pd import ta import config logger = logging.getLogger(__name__) def calculate_indicators(candles_df: pd.DataFrame) -> pd.DataFrame: """Add technical indicators to a candles DataFrame. Expected input columns: timestamp, open, close, high, low, volume. Returns the DataFrame with additional indicator columns. """ df = candles_df.copy() if df.empty or len(df) < 30: logger.warning("Not enough candle data to compute indicators (got %d rows)", len(df)) return df close = df["close"] high = df["high"] low = df["low"] volume = df["volume"] # RSI(14) df["rsi"] = ta.momentum.rsi(close, window=14) # MACD(12, 26, 9) macd = ta.trend.MACD(close, window_slow=26, window_fast=12, window_sign=9) df["macd"] = macd.macd() df["macd_signal"] = macd.macd_signal() df["macd_hist"] = macd.macd_diff() # Bollinger Bands(20, 2) bb = ta.volatility.BollingerBands(close, window=20, window_dev=2) df["bb_upper"] = bb.bollinger_hband() df["bb_middle"] = bb.bollinger_mavg() df["bb_lower"] = bb.bollinger_lband() # EMA(9) and EMA(21) df["ema9"] = ta.trend.ema_indicator(close, window=9) df["ema21"] = ta.trend.ema_indicator(close, window=21) # VWAP (cumulative for the session) tp = (high + low + close) / 3 cumvol = volume.cumsum() df["vwap"] = (tp * volume).cumsum() / cumvol.replace(0, float("nan")) # ATR(14) df["atr"] = ta.volatility.average_true_range(high, low, close, window=14) # Volume moving average (20) df["vol_ma20"] = volume.rolling(window=20).mean() # ADX(14) — trend strength df["adx"] = ta.trend.adx(high, low, close, window=14) # Stochastic RSI(14, 14, 3, 3) df["stoch_rsi_k"] = ta.momentum.stochrsi_k(close, window=14, smooth1=3, smooth2=3) df["stoch_rsi_d"] = ta.momentum.stochrsi_d(close, window=14, smooth1=3, smooth2=3) # OBV (On-Balance Volume) + slope df["obv"] = ta.volume.on_balance_volume(close, volume) df["obv_slope"] = df["obv"].diff(5) # CMF (Chaikin Money Flow, 20-period) df["cmf"] = ta.volume.chaikin_money_flow(high, low, close, volume, window=20) return df def calculate_htf_indicators(candles_df: pd.DataFrame) -> pd.DataFrame: """Calculate trend-context indicators on higher timeframe (1h) candles.""" df = candles_df.copy() if df.empty or len(df) < 30: return df close = df["close"] high = df["high"] low = df["low"] df["ema9"] = ta.trend.ema_indicator(close, window=9) df["ema21"] = ta.trend.ema_indicator(close, window=21) df["adx"] = ta.trend.adx(high, low, close, window=14) df["rsi"] = ta.momentum.rsi(close, window=14) return df def calculate_pivot_points(htf_df: pd.DataFrame) -> dict | None: """Calculate classic pivot points from previous day's price action using 1h candles.""" if htf_df.empty or len(htf_df) < 24: return None df = htf_df.copy() df["date"] = df["timestamp"].dt.date dates = sorted(df["date"].unique()) if len(dates) < 2: return None prev_day = df[df["date"] == dates[-2]] if prev_day.empty: return None h = prev_day["high"].max() l = prev_day["low"].min() c = prev_day["close"].iloc[-1] pivot = (h + l + c) / 3 return { "pivot": pivot, "r1": 2 * pivot - l, "r2": pivot + (h - l), "r3": h + 2 * (pivot - l), "s1": 2 * pivot - h, "s2": pivot - (h - l), "s3": l - 2 * (h - pivot), } def summarize_indicators(df: pd.DataFrame, symbol: str) -> str: """Produce a concise text summary of the latest indicator values for LLM consumption.""" if df.empty or len(df) < 2: return f"{symbol}: insufficient data\n" last = df.iloc[-1] prev = df.iloc[-2] name = config.SYMBOL_NAMES.get(symbol, symbol) lines = [f"### {name} ({symbol})"] lines.append(f"Price: {last['close']:.6g} | High: {last['high']:.6g} | Low: {last['low']:.6g}") lines.append(f"Volume: {last['volume']:.2f} | Vol MA20: {last.get('vol_ma20', 0):.2f}") if pd.notna(last.get("rsi")): lines.append(f"RSI(14): {last['rsi']:.1f}") if pd.notna(last.get("macd")): hist_dir = "↑" if last["macd_hist"] > prev.get("macd_hist", 0) else "↓" lines.append( f"MACD: {last['macd']:.6g} | Signal: {last['macd_signal']:.6g} | " f"Hist: {last['macd_hist']:.6g} ({hist_dir})" ) if pd.notna(last.get("bb_upper")): lines.append( f"BB: Upper={last['bb_upper']:.6g} Mid={last['bb_middle']:.6g} Lower={last['bb_lower']:.6g}" ) if pd.notna(last.get("ema9")): trend = "bullish" if last["ema9"] > last["ema21"] else "bearish" lines.append(f"EMA9: {last['ema9']:.6g} | EMA21: {last['ema21']:.6g} → {trend}") if pd.notna(last.get("atr")): lines.append(f"ATR(14): {last['atr']:.6g}") if pd.notna(last.get("vwap")): lines.append(f"VWAP: {last['vwap']:.6g}") if pd.notna(last.get("adx")): adx_label = "趨勢" if last["adx"] > 25 else "盤整" if last["adx"] < 20 else "弱" lines.append(f"ADX(14): {last['adx']:.1f} ({adx_label})") if pd.notna(last.get("stoch_rsi_k")): lines.append(f"StochRSI: K={last['stoch_rsi_k']:.2f} D={last['stoch_rsi_d']:.2f}") if pd.notna(last.get("obv_slope")): obv_dir = "流入" if last["obv_slope"] > 0 else "流出" lines.append(f"OBV趨勢: {obv_dir} (slope={last['obv_slope']:.0f})") if pd.notna(last.get("cmf")): cmf_label = "買壓" if last["cmf"] > 0.05 else "賣壓" if last["cmf"] < -0.05 else "中性" lines.append(f"CMF(20): {last['cmf']:.3f} ({cmf_label})") return "\n".join(lines) + "\n" def summarize_htf(htf_by_symbol: dict[str, pd.DataFrame]) -> str: """Produce a compact 1h trend context summary for LLM.""" lines = ["## 1小時趨勢背景"] for sym in sorted(htf_by_symbol): df = htf_by_symbol[sym] if df.empty or len(df) < 2: continue last = df.iloc[-1] name = config.SYMBOL_NAMES.get(sym, sym) trend = "多頭" if last.get("ema9", 0) > last.get("ema21", 0) else "空頭" adx_val = last.get("adx", 0) strength = "強趨勢" if adx_val > 25 else "盤整" if adx_val < 20 else "弱趨勢" rsi_1h = last.get("rsi", 50) lines.append(f"- {name}: {trend} ({strength}, ADX={adx_val:.0f}, RSI={rsi_1h:.0f})") return "\n".join(lines) + "\n" def summarize_pivots(pivots_by_symbol: dict[str, dict], current_prices: dict) -> str: """Summarize pivot point levels relative to current price.""" lines = ["## 關鍵支撐/阻力 (日線樞紐)"] for sym in sorted(pivots_by_symbol): p = pivots_by_symbol[sym] if not p: continue name = config.SYMBOL_NAMES.get(sym, sym) price = current_prices.get(sym, 0) if price <= 0: continue supports = [("S3", p["s3"]), ("S2", p["s2"]), ("S1", p["s1"]), ("P", p["pivot"])] resistances = [("P", p["pivot"]), ("R1", p["r1"]), ("R2", p["r2"]), ("R3", p["r3"])] nearest_sup = max([(n, v) for n, v in supports if v < price], key=lambda x: x[1], default=None) nearest_res = min([(n, v) for n, v in resistances if v > price], key=lambda x: x[1], default=None) sup_str = f"{nearest_sup[0]}={nearest_sup[1]:.6g}" if nearest_sup else "N/A" res_str = f"{nearest_res[0]}={nearest_res[1]:.6g}" if nearest_res else "N/A" lines.append(f"- {name}: 支撐 {sup_str} | 阻力 {res_str}") return "\n".join(lines) + "\n" def summarize_all(indicators_by_symbol: dict[str, pd.DataFrame]) -> str: """Build a combined indicator summary for all symbols.""" parts = [] for sym, df in sorted(indicators_by_symbol.items()): parts.append(summarize_indicators(df, sym)) return "\n".join(parts)