diff --git a/backtest/data_loader.py b/backtest/data_loader.py new file mode 100644 index 0000000..d171ff7 --- /dev/null +++ b/backtest/data_loader.py @@ -0,0 +1,143 @@ +"""Fetch and cache historical candle data from Bitfinex public API.""" + +import logging +import os +import time + +import pandas as pd +import requests + +logger = logging.getLogger(__name__) + +CACHE_DIR = os.path.join(os.path.dirname(__file__), "..", "cache", "backtest") +BFX_BASE_URL = "https://api-pub.bitfinex.com" +MAX_CANDLES_PER_REQUEST = 10000 + + +def _fetch_candles_page(symbol: str, timeframe: str, start_ms: int, end_ms: int, limit: int = MAX_CANDLES_PER_REQUEST) -> list: + url = f"{BFX_BASE_URL}/v2/candles/trade:{timeframe}:{symbol}/hist" + params = {"start": start_ms, "end": end_ms, "limit": limit, "sort": 1} + for attempt in range(5): + resp = requests.get(url, params=params, timeout=30) + if resp.status_code == 429: + wait = 2 ** attempt + 1 + logger.warning("Rate limited, waiting %ds...", wait) + time.sleep(wait) + continue + resp.raise_for_status() + return resp.json() + resp.raise_for_status() + return [] + + +def fetch_historical_candles(symbol: str, timeframe: str, start_ms: int, end_ms: int) -> pd.DataFrame: + """Fetch candles with pagination. Returns full DataFrame sorted ascending.""" + all_candles = [] + current_start = start_ms + + while current_start < end_ms: + raw = _fetch_candles_page(symbol, timeframe, current_start, end_ms) + if not raw: + break + all_candles.extend(raw) + last_ts = raw[-1][0] + if last_ts <= current_start: + break + current_start = last_ts + 1 + time.sleep(1.5) + + if not all_candles: + return pd.DataFrame() + + df = pd.DataFrame(all_candles, columns=["timestamp", "open", "close", "high", "low", "volume"]) + df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms") + df = df.drop_duplicates(subset=["timestamp"]).sort_values("timestamp").reset_index(drop=True) + return df + + +def _cache_path(symbol: str, timeframe: str) -> str: + os.makedirs(CACHE_DIR, exist_ok=True) + safe_sym = symbol.replace(":", "_") + return os.path.join(CACHE_DIR, f"{safe_sym}_{timeframe}.csv") + + +def load_or_fetch_perp(start_date: str, end_date: str) -> pd.DataFrame: + """Load BTC perpetual (tBTCF0:USTF0) 1h candles for basis/funding calculation. + + Returns DataFrame with columns: timestamp, open, close, high, low, volume. + Returns empty DataFrame on failure (graceful degradation). + """ + symbol = "tBTCF0:USTF0" + start_ms = int(pd.Timestamp(start_date).timestamp() * 1000) + end_ms = int(pd.Timestamp(end_date).timestamp() * 1000) + + cache = _cache_path(symbol, "1h") + range_hours = (end_ms - start_ms) / 3_600_000 + min_expected = int(range_hours * 0.5) + + if os.path.exists(cache): + df = pd.read_csv(cache, parse_dates=["timestamp"]) + if len(df) >= min_expected: + logger.info("Loaded perp %s from cache (%d candles)", symbol, len(df)) + return df + logger.info("Perp cache too small (%d < %d), re-fetching", len(df), min_expected) + os.remove(cache) + + try: + logger.info("Fetching %s 1h from Bitfinex...", symbol) + df = fetch_historical_candles(symbol, "1h", start_ms, end_ms) + if not df.empty: + df.to_csv(cache, index=False) + logger.info("Fetched %s 1h: %d candles", symbol, len(df)) + return df + except Exception as e: + logger.warning("Failed to fetch perp data %s: %s (continuing without)", symbol, e) + return pd.DataFrame() + + +def load_or_fetch(symbols: list[str], start_date: str, end_date: str) -> dict[str, dict]: + """Load from cache if available, otherwise fetch and cache. + + Returns {symbol: {"candles_5m": DataFrame, "candles_1h": DataFrame}}. + """ + start_ms = int(pd.Timestamp(start_date).timestamp() * 1000) + end_ms = int(pd.Timestamp(end_date).timestamp() * 1000) + + # Calculate expected minimum candle count for the date range + range_hours = (end_ms - start_ms) / 3_600_000 + min_5m = int(range_hours * 12 * 0.5) # at least 50% of expected + min_1h = int(range_hours * 0.5) + + data = {} + for sym in symbols: + result = {} + for tf in ("5m", "1h"): + cache = _cache_path(sym, tf) + min_expected = min_5m if tf == "5m" else min_1h + df = pd.DataFrame() + + if os.path.exists(cache): + df = pd.read_csv(cache, parse_dates=["timestamp"]) + if len(df) < min_expected: + logger.info("Cache %s %s too small (%d < %d), re-fetching", sym, tf, len(df), min_expected) + os.remove(cache) + df = pd.DataFrame() + else: + logger.info("Loaded %s %s from cache (%d candles)", sym, tf, len(df)) + + if df.empty: + try: + logger.info("Fetching %s %s from Bitfinex...", sym, tf) + df = fetch_historical_candles(sym, tf, start_ms, end_ms) + if not df.empty: + df.to_csv(cache, index=False) + logger.info("Fetched %s %s: %d candles", sym, tf, len(df)) + time.sleep(1.5) + except Exception as e: + logger.warning("Failed to fetch %s %s: %s (skipping)", sym, tf, e) + df = pd.DataFrame() + + result[f"candles_{tf}"] = df + data[sym] = result + + return data diff --git a/backtest/engine.py b/backtest/engine.py new file mode 100644 index 0000000..4e8d950 --- /dev/null +++ b/backtest/engine.py @@ -0,0 +1,618 @@ +"""Backtesting simulation engine.""" + +import logging +import sys +import os + +import numpy as np +import pandas as pd + +# Add parent dir to path so we can import project modules +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +import config +import indicators +import risk_manager +from backtest import signal_generator + +logger = logging.getLogger(__name__) + +WARMUP_BARS = 100 +FEE_PCT = 0.003 # 0.2% fee + 0.1% slippage + +# Backtest-specific overrides (tighter for ranging markets) +BT_ATR_SL_MIN_PCT = 0.02 # 3% → 2% +BT_ATR_SL_MAX_PCT = 0.05 # 8% → 5% + +# Trailing stop thresholds +TRAILING_BREAKEVEN_PCT = 0.03 # ≥3% profit: move stop to entry (breakeven) +TRAILING_ATR_PCT = 0.05 # ≥5% profit: trailing stop = peak - 2×ATR + +# Position management +MAX_CONCURRENT_SYMBOLS = 3 # max 3 symbols at once +MAX_ADDON_PER_SYMBOL = 1 # max 1 add-on per symbol +BUY_COOLDOWN_BARS = 48 # 4 hours cooldown after buy +TIMEOUT_UNCONDITIONAL_BARS = 864 # 72h: close unconditionally +MIN_PROFIT_FOR_SIGNAL_SELL = 0.03 # 3% min profit for signal sell +MAX_DAILY_BUYS_PER_SYMBOL = 1 # max 1 new position per symbol per day + + +def _calc_atr_stop_price(entry_price: float, atr: float) -> float: + atr_pct = (atr * config.ATR_SL_MULTIPLIER) / entry_price + bounded_pct = max(BT_ATR_SL_MIN_PCT, min(BT_ATR_SL_MAX_PCT, atr_pct)) + return round(entry_price * (1 - bounded_pct), 8) + + +def _calc_all_daily_pivots(htf_df: pd.DataFrame) -> dict: + """Calculate pivot points for each day from 1h candles. + + Returns {date_str: {"pivot": ..., "r1": ..., "s1": ..., ...}}. + """ + if htf_df.empty or len(htf_df) < 24: + return {} + + df = htf_df.copy() + df["date"] = df["timestamp"].dt.date + pivots_by_date = {} + + dates = sorted(df["date"].unique()) + for i in range(1, len(dates)): + prev_day = df[df["date"] == dates[i - 1]] + if prev_day.empty: + continue + h = prev_day["high"].max() + l = prev_day["low"].min() + c = prev_day["close"].iloc[-1] + pivot = (h + l + c) / 3 + pivots_by_date[str(dates[i])] = { + "pivot": pivot, + "r1": 2 * pivot - l, + "r2": pivot + (h - l), + "r3": h + 2 * (pivot - l), + "s1": 2 * pivot - h, + "s2": pivot - (h - l), + "s3": l - 2 * (pivot - l), + } + + return pivots_by_date + + +class BacktestEngine: + def __init__(self, starting_capital: float = 10000.0): + self.starting_capital = starting_capital + self.portfolio = { + "total_balance_usdt": starting_capital, + "available_usdt": starting_capital, + "positions": {}, + "trade_history": [], + "initial_capital": starting_capital, + } + self.stop_orders = {} # symbol -> {"stop_price", "entry_price", "amount"} + self.trade_log = [] # all executed trades + self.equity_curve = [] # (timestamp, equity) + self.buy_cooldown = {} # symbol -> bar_index of last buy + self.addon_count = {} # symbol -> number of add-ons + self.entry_bar = {} # symbol -> bar_index of first entry + self.trailing_stop = {} # symbol -> peak_price for trailing stop + self.daily_buys = {} # (symbol, date_str) -> count + + def run(self, data: dict[str, dict], btc_perp_1h: pd.DataFrame | None = None, + use_context: bool = True) -> dict: + """Run backtest on historical data. + + Args: + data: {symbol: {"candles_5m": DataFrame, "candles_1h": DataFrame}} + btc_perp_1h: BTC perpetual 1h candles for funding/basis calculation. + use_context: If False, disable all context filters (for A/B testing). + + Returns: + Results dict with metrics, trades, equity curve. + """ + # Step 1: Pre-compute indicators for all symbols + ind_5m = {} + ind_1h = {} + pivots_by_sym = {} + + for sym, d in data.items(): + df_5m = d["candles_5m"] + df_1h = d["candles_1h"] + + if df_5m.empty or len(df_5m) < WARMUP_BARS: + logger.warning("Skipping %s: insufficient 5m data (%d bars)", sym, len(df_5m)) + continue + + ind_5m[sym] = indicators.calculate_indicators(df_5m) + + if not df_1h.empty and len(df_1h) >= 30: + ind_1h[sym] = indicators.calculate_htf_indicators(df_1h) + pivots_by_sym[sym] = _calc_all_daily_pivots(df_1h) + else: + ind_1h[sym] = pd.DataFrame() + pivots_by_sym[sym] = {} + + if not ind_5m: + logger.error("No symbols with sufficient data") + return self._build_results() + + # Step 2: Build unified 5m timeline (use longest series) + ref_sym = max(ind_5m, key=lambda s: len(ind_5m[s])) + timeline = ind_5m[ref_sym]["timestamp"].values + + # Pre-extract 1h timestamps for searchsorted + htf_ts = {} + for sym in ind_1h: + if not ind_1h[sym].empty: + htf_ts[sym] = ind_1h[sym]["timestamp"].values + + # Pre-extract BTC 1h and perp timestamps for context + btc_sym = "tBTCUST" + btc_htf_ts = htf_ts.get(btc_sym) + btc_htf_df = ind_1h.get(btc_sym) + + perp_ts = None + perp_df = btc_perp_1h + if perp_df is not None and not perp_df.empty: + perp_ts = perp_df["timestamp"].values + + # Step 3: Iterate + total_bars = len(timeline) + log_interval = max(1, total_bars // 20) + + for i in range(WARMUP_BARS, total_bars): + ts = timeline[i] + if i % log_interval == 0: + pct = i / total_bars * 100 + logger.info("Progress: %.0f%% (%s)", pct, pd.Timestamp(ts)) + + current_prices = {} + signals = [] + + # --- Build context dict (before symbol loop) --- + context = None + if use_context: + context = {} + + # BTC trend context + if btc_htf_ts is not None and btc_htf_df is not None and len(btc_htf_ts) > 0: + btc_idx = np.searchsorted(btc_htf_ts, ts, side="right") - 1 + if btc_idx >= 0: + btc_row = btc_htf_df.iloc[btc_idx] + btc_ema9 = btc_row.get("ema9", 0) + btc_ema21 = btc_row.get("ema21", 0) + btc_adx = btc_row.get("adx", 0) + ema9_gt_ema21 = btc_ema9 > btc_ema21 if pd.notna(btc_ema9) and pd.notna(btc_ema21) else True + btc_adx_val = btc_adx if pd.notna(btc_adx) else 0 + context["btc_trend"] = { + "ema9_gt_ema21": ema9_gt_ema21, + "adx": btc_adx_val, + "bearish_confirmed": (not ema9_gt_ema21) and btc_adx_val > 20, + } + + # Funding sentiment (BTC perp basis) + if perp_ts is not None and btc_htf_ts is not None: + perp_idx = np.searchsorted(perp_ts, ts, side="right") - 1 + btc_spot_idx = np.searchsorted(btc_htf_ts, ts, side="right") - 1 + if perp_idx >= 0 and btc_spot_idx >= 0: + perp_close = perp_df.iloc[perp_idx].get("close", 0) + spot_close = btc_htf_df.iloc[btc_spot_idx].get("close", 0) + if spot_close > 0 and perp_close > 0: + basis_pct = (perp_close - spot_close) / spot_close + if basis_pct > 0.0005: + signal_label = "overleveraged_long" + elif basis_pct < -0.0005: + signal_label = "fearful" + else: + signal_label = "neutral" + context["funding_sentiment"] = { + "basis_pct": basis_pct, + "signal": signal_label, + } + + for sym in ind_5m: + df = ind_5m[sym] + if i >= len(df): + continue + + row = df.iloc[i] + close = row.get("close", 0) + if close <= 0: + continue + current_prices[sym] = close + + # Check stop-loss (including trailing stop) + if sym in self.stop_orders: + low = row.get("low", close) + stop = self.stop_orders[sym] + if low <= stop["stop_price"]: + # Determine if this was a trailing stop exit + exit_type = "stop_loss" + if sym in self.trailing_stop: + exit_type = "trailing_stop" + self._execute_stop_loss(sym, stop, ts, exit_type) + continue + + # Update trailing stop for open positions + pos = self.portfolio.get("positions", {}).get(sym, {}) + if pos.get("amount", 0) > 0 and sym in self.stop_orders: + entry = pos.get("entry_price", 0) + high = row.get("high", close) + atr = row.get("atr", 0) + + if entry > 0: + # Track peak price + if sym not in self.trailing_stop: + self.trailing_stop[sym] = high + else: + self.trailing_stop[sym] = max(self.trailing_stop[sym], high) + + peak = self.trailing_stop[sym] + profit_pct = (peak - entry) / entry + + new_stop = self.stop_orders[sym]["stop_price"] + + if profit_pct >= TRAILING_ATR_PCT and atr > 0: + # ≥5% profit: trailing stop = peak - 2×ATR + trail_stop = peak - 2 * atr + new_stop = max(new_stop, trail_stop) + elif profit_pct >= TRAILING_BREAKEVEN_PCT: + # ≥3% profit: move stop to entry (breakeven) + new_stop = max(new_stop, entry) + + if new_stop > self.stop_orders[sym]["stop_price"]: + self.stop_orders[sym]["stop_price"] = round(new_stop, 8) + + # Check position timeout (72h unconditional) + if pos.get("amount", 0) > 0 and sym in self.entry_bar: + bars_held = i - self.entry_bar[sym] + if bars_held >= TIMEOUT_UNCONDITIONAL_BARS: + self._execute_timeout(sym, pos, close, ts, "timeout_72h") + continue + + # Generate signal + if i < 1: + continue + prev_row = df.iloc[i - 1] + + # Find latest 1h context + htf_last = None + if sym in htf_ts and len(htf_ts[sym]) > 0: + idx = np.searchsorted(htf_ts[sym], ts, side="right") - 1 + if idx >= 0: + htf_last = ind_1h[sym].iloc[idx] + + # Find today's pivots + ts_date = str(pd.Timestamp(ts).date()) + pivots = pivots_by_sym.get(sym, {}).get(ts_date) + + has_pos = self.portfolio.get("positions", {}).get(sym, {}).get("amount", 0) > 0 + + # Add per-symbol buy_pressure to context + sym_context = context + if context is not None: + sym_context = dict(context) + sym_context["buy_pressure"] = row.get("buy_pressure", 0.5) + + sig = signal_generator.generate_signal(sym, row, prev_row, htf_last, pivots, + has_position=has_pos, context=sym_context) + if sig["action"] != "HOLD": + # Attach ATR for stop-loss calculation + sig["atr"] = row.get("atr", 0) + # Apply cooldown filter for BUY signals + if sig["action"] == "BUY" and sym in self.buy_cooldown: + if i - self.buy_cooldown[sym] < BUY_COOLDOWN_BARS: + continue + # Apply min profit filter for signal SELL + if sig["action"] == "SELL" and has_pos: + entry = pos.get("entry_price", 0) + pnl_pct = (close - entry) / entry if entry > 0 else 0 + # Allow sell if 1h strong bearish, otherwise require min profit + if pnl_pct < MIN_PROFIT_FOR_SIGNAL_SELL: + htf_last_check = None + if sym in htf_ts and len(htf_ts[sym]) > 0: + idx = np.searchsorted(htf_ts[sym], ts, side="right") - 1 + if idx >= 0: + htf_last_check = ind_1h[sym].iloc[idx] + is_strong_bearish = (htf_last_check is not None + and pd.notna(htf_last_check.get("ema9")) + and htf_last_check["ema9"] < htf_last_check["ema21"] + and htf_last_check.get("adx", 0) > 25) + if not is_strong_bearish: + continue + signals.append(sig) + + # Sort signals by confidence (high first) to prioritize capital allocation + signals.sort(key=lambda s: s.get("confidence", 0), reverse=True) + + for sig in signals: + self._process_signal(sig, current_prices, ts, i) + + # Record equity + equity = self.portfolio["available_usdt"] + for sym, pos in self.portfolio.get("positions", {}).items(): + if pos.get("amount", 0) > 0: + px = current_prices.get(sym, pos.get("entry_price", 0)) + equity += pos["amount"] * px + self.equity_curve.append((ts, equity)) + + return self._build_results() + + def _execute_stop_loss(self, sym: str, stop: dict, ts, exit_type: str = "stop_loss"): + amount = stop["amount"] + stop_price = stop["stop_price"] + sell_price = stop_price * (1 - FEE_PCT) + entry = stop["entry_price"] + + realized_pnl = amount * sell_price - amount * entry + realized_pnl_pct = (sell_price - entry) / entry * 100 if entry > 0 else 0 + + # Update portfolio + self.portfolio["available_usdt"] += amount * sell_price + self.portfolio["positions"].pop(sym, None) + del self.stop_orders[sym] + self.entry_bar.pop(sym, None) + self.addon_count.pop(sym, None) + self.trailing_stop.pop(sym, None) + + self.trade_log.append({ + "timestamp": str(pd.Timestamp(ts)), + "symbol": sym, + "action": "SELL", + "type": exit_type, + "amount": amount, + "price": stop_price, + "effective_price": sell_price, + "amount_usdt": amount * sell_price, + "entry_price": entry, + "realized_pnl": realized_pnl, + "realized_pnl_pct": realized_pnl_pct, + }) + + def _execute_timeout(self, sym: str, pos: dict, close: float, ts, timeout_type: str): + amount = pos["amount"] + entry = pos["entry_price"] + sell_price = close * (1 - FEE_PCT) + + realized_pnl = amount * sell_price - amount * entry + realized_pnl_pct = (sell_price - entry) / entry * 100 if entry > 0 else 0 + + self.portfolio["available_usdt"] += amount * sell_price + self.portfolio["positions"].pop(sym, None) + self.stop_orders.pop(sym, None) + self.entry_bar.pop(sym, None) + self.addon_count.pop(sym, None) + self.trailing_stop.pop(sym, None) + + self.trade_log.append({ + "timestamp": str(pd.Timestamp(ts)), + "symbol": sym, + "action": "SELL", + "type": timeout_type, + "amount": amount, + "price": close, + "effective_price": sell_price, + "amount_usdt": amount * sell_price, + "entry_price": entry, + "realized_pnl": realized_pnl, + "realized_pnl_pct": realized_pnl_pct, + }) + + def _execute_take_profit(self, sym: str, pos: dict, tp_price: float, ts): + amount = pos["amount"] + entry = pos["entry_price"] + sell_price = tp_price * (1 - FEE_PCT) + + realized_pnl = amount * sell_price - amount * entry + realized_pnl_pct = (sell_price - entry) / entry * 100 if entry > 0 else 0 + + self.portfolio["available_usdt"] += amount * sell_price + self.portfolio["positions"].pop(sym, None) + self.stop_orders.pop(sym, None) + self.entry_bar.pop(sym, None) + self.addon_count.pop(sym, None) + self.trailing_stop.pop(sym, None) + + self.trade_log.append({ + "timestamp": str(pd.Timestamp(ts)), + "symbol": sym, + "action": "SELL", + "type": "take_profit", + "amount": amount, + "price": tp_price, + "effective_price": sell_price, + "amount_usdt": amount * sell_price, + "entry_price": entry, + "realized_pnl": realized_pnl, + "realized_pnl_pct": realized_pnl_pct, + }) + + def _process_signal(self, signal: dict, current_prices: dict, ts, bar_index: int): + action = signal["action"] + sym = signal["symbol"] + + # Build indicators_df stub for risk_manager (it reads iloc[-1] for ATR/close) + price = current_prices.get(sym, 0) + if price <= 0: + return + + validated, reject_reason = risk_manager.validate_trade(signal, self.portfolio, None, current_prices) + if validated is None: + return + + if action == "BUY": + # Enforce daily buy limit per symbol + ts_date = str(pd.Timestamp(ts).date()) + daily_key = (sym, ts_date) + if self.daily_buys.get(daily_key, 0) >= MAX_DAILY_BUYS_PER_SYMBOL: + return + + # Enforce max concurrent symbols + active_positions = {s for s, p in self.portfolio.get("positions", {}).items() + if p.get("amount", 0) > 0} + if sym not in active_positions and len(active_positions) >= MAX_CONCURRENT_SYMBOLS: + return + + # Enforce max add-on per symbol + if sym in active_positions and self.addon_count.get(sym, 0) >= MAX_ADDON_PER_SYMBOL: + return + + amount_usdt = validated.get("amount_usdt", 0) + buy_price = price * (1 + FEE_PCT) + amount = amount_usdt / buy_price + + # Check minimum order amount + min_amt = config.MIN_ORDER_AMOUNT.get(sym, 0) + if min_amt > 0 and amount < min_amt: + return + + if amount_usdt > self.portfolio["available_usdt"]: + return + + # Update portfolio + pos = self.portfolio.get("positions", {}).get(sym, {}) + old_amt = pos.get("amount", 0) + old_entry = pos.get("entry_price", 0) + new_amt = old_amt + amount + new_entry = ((old_amt * old_entry) + (amount * buy_price)) / new_amt if new_amt > 0 else buy_price + + self.portfolio.setdefault("positions", {})[sym] = { + "amount": new_amt, + "entry_price": new_entry, + "value_usdt": new_amt * new_entry, + "last_update": float(pd.Timestamp(ts).timestamp()), + } + self.portfolio["available_usdt"] -= amount_usdt + + # Set ATR-based stop-loss + atr = signal.get("atr", 0) + if atr > 0: + stop_price = _calc_atr_stop_price(new_entry, atr) + else: + stop_price = round(new_entry * (1 - BT_ATR_SL_MAX_PCT), 8) + self.stop_orders[sym] = { + "stop_price": stop_price, + "entry_price": new_entry, + "amount": new_amt, + } + # Reset trailing stop tracking on new/add-on entry + self.trailing_stop.pop(sym, None) + + # Track cooldown, addon count, entry bar, daily buys + self.buy_cooldown[sym] = bar_index + self.daily_buys[daily_key] = self.daily_buys.get(daily_key, 0) + 1 + if old_amt > 0: + self.addon_count[sym] = self.addon_count.get(sym, 0) + 1 + else: + self.addon_count[sym] = 0 + self.entry_bar[sym] = bar_index + + self.trade_log.append({ + "timestamp": str(pd.Timestamp(ts)), + "symbol": sym, + "action": "BUY", + "type": "signal", + "amount": amount, + "price": price, + "effective_price": buy_price, + "amount_usdt": amount_usdt, + "confidence": signal.get("confidence", 0), + "reason": signal.get("reason", ""), + "stop_price": stop_price, + }) + + elif action == "SELL": + pos = self.portfolio.get("positions", {}).get(sym, {}) + amount = pos.get("amount", 0) + if amount <= 0: + return + entry = pos.get("entry_price", 0) + sell_price = price * (1 - FEE_PCT) + + realized_pnl = amount * sell_price - amount * entry + realized_pnl_pct = (sell_price - entry) / entry * 100 if entry > 0 else 0 + + self.portfolio["available_usdt"] += amount * sell_price + self.portfolio["positions"].pop(sym, None) + self.stop_orders.pop(sym, None) + self.entry_bar.pop(sym, None) + self.addon_count.pop(sym, None) + self.trailing_stop.pop(sym, None) + + self.trade_log.append({ + "timestamp": str(pd.Timestamp(ts)), + "symbol": sym, + "action": "SELL", + "type": "signal", + "amount": amount, + "price": price, + "effective_price": sell_price, + "amount_usdt": amount * sell_price, + "entry_price": entry, + "realized_pnl": realized_pnl, + "realized_pnl_pct": realized_pnl_pct, + "confidence": signal.get("confidence", 0), + "reason": signal.get("reason", ""), + }) + + def _build_results(self) -> dict: + """Calculate performance metrics.""" + equity_df = pd.DataFrame(self.equity_curve, columns=["timestamp", "equity"]) + trades_df = pd.DataFrame(self.trade_log) if self.trade_log else pd.DataFrame() + + final_equity = equity_df["equity"].iloc[-1] if not equity_df.empty else self.starting_capital + total_return = (final_equity - self.starting_capital) / self.starting_capital * 100 + + # Max drawdown + max_dd = 0.0 + if not equity_df.empty: + peak = equity_df["equity"].expanding().max() + drawdown = (equity_df["equity"] - peak) / peak * 100 + max_dd = drawdown.min() + + # Trade statistics + sells = [t for t in self.trade_log if t["action"] == "SELL"] + buys = [t for t in self.trade_log if t["action"] == "BUY"] + wins = [t for t in sells if t.get("realized_pnl", 0) > 0] + losses = [t for t in sells if t.get("realized_pnl", 0) <= 0] + + win_rate = len(wins) / len(sells) * 100 if sells else 0 + gross_profit = sum(t["realized_pnl"] for t in wins) if wins else 0 + gross_loss = abs(sum(t["realized_pnl"] for t in losses)) if losses else 0 + profit_factor = gross_profit / gross_loss if gross_loss > 0 else float("inf") if gross_profit > 0 else 0 + + avg_win = gross_profit / len(wins) if wins else 0 + avg_loss = gross_loss / len(losses) if losses else 0 + + # Trade type breakdown + sl_trades = [t for t in sells if t.get("type") == "stop_loss"] + tp_trades = [t for t in sells if t.get("type") == "take_profit"] + sig_sells = [t for t in sells if t.get("type") == "signal"] + timeout_trades = [t for t in sells if t.get("type", "").startswith("timeout")] + trailing_trades = [t for t in sells if t.get("type") == "trailing_stop"] + + # Days with exposure + days = (equity_df["timestamp"].iloc[-1] - equity_df["timestamp"].iloc[0]).days if len(equity_df) > 1 else 1 + annualized = total_return * (365 / days) if days > 0 else 0 + + return { + "starting_capital": self.starting_capital, + "final_equity": round(final_equity, 2), + "total_return_pct": round(total_return, 2), + "annualized_return_pct": round(annualized, 2), + "max_drawdown_pct": round(max_dd, 2), + "total_buys": len(buys), + "total_sells": len(sells), + "stop_loss_count": len(sl_trades), + "take_profit_count": len(tp_trades), + "signal_sell_count": len(sig_sells), + "timeout_count": len(timeout_trades), + "trailing_stop_count": len(trailing_trades), + "win_rate_pct": round(win_rate, 2), + "profit_factor": round(profit_factor, 2), + "avg_win_usdt": round(avg_win, 2), + "avg_loss_usdt": round(avg_loss, 2), + "gross_profit": round(gross_profit, 2), + "gross_loss": round(gross_loss, 2), + "days": days, + "trades": self.trade_log, + "equity_curve": self.equity_curve, + } diff --git a/backtest/run_backtest.py b/backtest/run_backtest.py new file mode 100644 index 0000000..3f10607 --- /dev/null +++ b/backtest/run_backtest.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +"""CLI entry point for backtesting.""" + +import argparse +import csv +import json +import logging +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +import config +from backtest.data_loader import load_or_fetch, load_or_fetch_perp +from backtest.engine import BacktestEngine + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger("backtest") + +RESULTS_DIR = os.path.join(os.path.dirname(__file__), "results") + + +def print_report(results: dict): + """Print formatted backtest report to stdout.""" + print("\n" + "=" * 60) + print(" BACKTEST RESULTS") + print("=" * 60) + print(f" Starting Capital: {results['starting_capital']:>12,.2f} USDT") + print(f" Final Equity: {results['final_equity']:>12,.2f} USDT") + print(f" Total Return: {results['total_return_pct']:>+11.2f}%") + print(f" Annualized Return: {results['annualized_return_pct']:>+11.2f}%") + print(f" Max Drawdown: {results['max_drawdown_pct']:>11.2f}%") + print(f" Period: {results['days']:>8d} days") + print("-" * 60) + print(f" Total BUYs: {results['total_buys']:>8d}") + print(f" Total SELLs: {results['total_sells']:>8d}") + print(f" - Signal SELL: {results['signal_sell_count']:>8d}") + print(f" - Take Profit: {results['take_profit_count']:>8d}") + print(f" - Trailing Stop: {results.get('trailing_stop_count', 0):>8d}") + print(f" - Stop Loss: {results['stop_loss_count']:>8d}") + print(f" - Timeout: {results.get('timeout_count', 0):>8d}") + print("-" * 60) + print(f" Win Rate: {results['win_rate_pct']:>11.2f}%") + print(f" Profit Factor: {results['profit_factor']:>11.2f}") + print(f" Avg Win: {results['avg_win_usdt']:>+11.2f} USDT") + print(f" Avg Loss: {results['avg_loss_usdt']:>11.2f} USDT") + print(f" Gross Profit: {results['gross_profit']:>+11.2f} USDT") + print(f" Gross Loss: {results['gross_loss']:>11.2f} USDT") + open_positions = results['total_buys'] - results['total_sells'] + print(f" Open Positions: {open_positions:>8d}") + print("=" * 60 + "\n") + + +def save_results(results: dict): + """Save trades CSV, summary JSON, and equity curve CSV.""" + os.makedirs(RESULTS_DIR, exist_ok=True) + + # Summary JSON (without large lists) + summary = {k: v for k, v in results.items() if k not in ("trades", "equity_curve")} + with open(os.path.join(RESULTS_DIR, "summary.json"), "w") as f: + json.dump(summary, f, indent=2) + + # Trades CSV + trades = results.get("trades", []) + if trades: + trades_path = os.path.join(RESULTS_DIR, "trades.csv") + all_keys = set() + for t in trades: + all_keys.update(t.keys()) + keys = sorted(all_keys) + with open(trades_path, "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=keys, extrasaction="ignore") + writer.writeheader() + writer.writerows(trades) + logger.info("Saved %d trades to %s", len(trades), trades_path) + + # Equity curve CSV + eq = results.get("equity_curve", []) + if eq: + eq_path = os.path.join(RESULTS_DIR, "equity_curve.csv") + with open(eq_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["timestamp", "equity"]) + for ts, val in eq: + writer.writerow([str(ts), f"{val:.2f}"]) + logger.info("Saved equity curve to %s", eq_path) + + +def main(): + parser = argparse.ArgumentParser(description="Backtest trading strategy") + parser.add_argument("--start", default="2026-01-01", help="Start date (YYYY-MM-DD)") + parser.add_argument("--end", default="2026-03-17", help="End date (YYYY-MM-DD)") + parser.add_argument("--capital", type=float, default=10000.0, help="Starting capital (USDT)") + parser.add_argument("--symbols", default=None, help="Comma-separated symbols (default: all 15)") + parser.add_argument("--no-context", action="store_true", help="Disable non-price context filters (for A/B comparison)") + args = parser.parse_args() + + symbols = args.symbols.split(",") if args.symbols else config.TOP_15_SYMBOLS + + logger.info("Backtest: %s to %s, capital=%.2f USDT, %d symbols", + args.start, args.end, args.capital, len(symbols)) + + # Step 1: Load data + logger.info("Loading historical data...") + data = load_or_fetch(symbols, args.start, args.end) + + # Filter symbols with sufficient data + valid = {sym: d for sym, d in data.items() if not d["candles_5m"].empty} + logger.info("Valid symbols: %d / %d", len(valid), len(symbols)) + + if not valid: + logger.error("No valid data found") + return + + # Step 1b: Load BTC perpetual data for funding/basis context + btc_perp_df = None + use_context = not args.no_context + if use_context: + logger.info("Loading BTC perpetual data for funding context...") + btc_perp_df = load_or_fetch_perp(args.start, args.end) + if btc_perp_df is not None and not btc_perp_df.empty: + logger.info("BTC perp data: %d candles", len(btc_perp_df)) + else: + logger.warning("No BTC perp data available — funding context disabled") + else: + logger.info("Context filters disabled (--no-context)") + + # Step 2: Run backtest + logger.info("Running simulation...") + engine = BacktestEngine(starting_capital=args.capital) + results = engine.run(valid, btc_perp_1h=btc_perp_df, use_context=use_context) + + # Step 3: Output + print_report(results) + save_results(results) + + +if __name__ == "__main__": + main() diff --git a/backtest/signal_generator.py b/backtest/signal_generator.py new file mode 100644 index 0000000..c88b1b7 --- /dev/null +++ b/backtest/signal_generator.py @@ -0,0 +1,262 @@ +"""Rule-based signal generator replicating the LLM trading strategy. + +Relaxed version: mirrors actual LLM behavior where volume/OBV are bonus factors +rather than hard filters, deep oversold alone can trigger entry, and bearish +trend reversal triggers protective sells on existing positions. +""" + +import pandas as pd + + +def _near_level(price: float, level: float, tolerance: float = 0.008) -> bool: + """Check if price is within tolerance % of a level.""" + if level <= 0: + return False + return abs(price - level) / level <= tolerance + + +def generate_signal( + symbol: str, + ind_curr: pd.Series, + ind_prev: pd.Series, + htf_last: pd.Series | None, + pivots: dict | None, + has_position: bool = False, + context: dict | None = None, +) -> dict: + """Generate a trading signal based on codified strategy rules. + + Matches real LLM behavior: flexible confirmation counting with bonus scoring + rather than strict AND filters. + """ + hold = {"symbol": symbol, "action": "HOLD", "confidence": 0.0, "reason": "", "suggested_amount_pct": 0.0} + + close = ind_curr.get("close", 0) + if close <= 0 or pd.isna(ind_curr.get("rsi")): + return {**hold, "reason": "insufficient data"} + + # --- Extract 5m indicators --- + rsi = ind_curr.get("rsi", 50) + stoch_k = ind_curr.get("stoch_rsi_k", 0.5) + stoch_d = ind_curr.get("stoch_rsi_d", 0.5) + macd = ind_curr.get("macd", 0) + macd_signal = ind_curr.get("macd_signal", 0) + macd_hist = ind_curr.get("macd_hist", 0) + prev_macd = ind_prev.get("macd", 0) + prev_macd_signal = ind_prev.get("macd_signal", 0) + bb_lower = ind_curr.get("bb_lower", 0) + bb_upper = ind_curr.get("bb_upper", 0) + ema9 = ind_curr.get("ema9", 0) + ema21 = ind_curr.get("ema21", 0) + adx = ind_curr.get("adx", 0) + volume = ind_curr.get("volume", 0) + vol_ma20 = ind_curr.get("vol_ma20", 0) + obv_slope = ind_curr.get("obv_slope", 0) + cmf = ind_curr.get("cmf", 0) + + # --- VWAP filter --- + vwap = ind_curr.get("vwap", 0) + + # --- 1h trend context --- + htf_bullish = False + htf_adx_ok = False + htf_strong = False + htf_bearish_strong = False + # Market regime: trending (ADX≥25), weak_trend (15-25), choppy (<15) + market_regime = "choppy" + if htf_last is not None and pd.notna(htf_last.get("ema9")): + htf_bullish = htf_last["ema9"] > htf_last["ema21"] + htf_adx = htf_last.get("adx", 0) + if pd.notna(htf_adx): + htf_adx_ok = htf_adx >= 20 + htf_strong = htf_adx > 25 + htf_bearish_strong = (not htf_bullish) and htf_adx > 25 + if htf_adx >= 25: + market_regime = "trending" + elif htf_adx >= 15: + market_regime = "weak_trend" + + # --- Pivot levels --- + s1 = pivots.get("s1", 0) if pivots else 0 + s2 = pivots.get("s2", 0) if pivots else 0 + r1 = pivots.get("r1", 0) if pivots else 0 + r2 = pivots.get("r2", 0) if pivots else 0 + + # --- Bonus scoring (not hard filters) --- + volume_bonus = 0.05 if (vol_ma20 > 0 and volume > vol_ma20) else 0 + obv_buy_bonus = 0.05 if obv_slope > 0 else 0 + obv_sell_bonus = 0.05 if obv_slope < 0 else 0 + + # --- Extract context data --- + ctx = context or {} + btc_trend = ctx.get("btc_trend") + buy_pressure = ctx.get("buy_pressure", 0.5) + funding = ctx.get("funding_sentiment") + + # ========== BUY SIGNALS ========== + # Regime filter: choppy market → no buys at all + if market_regime == "choppy": + pass # skip all buy logic + # BTC trend filter: skip altcoin buys during confirmed BTC bearish + elif (btc_trend and btc_trend.get("bearish_confirmed") + and symbol != "tBTCUST"): + pass # skip buy — BTC bearish confirmed + # VWAP filter: don't buy above VWAP (buying above fair value) + elif vwap > 0 and close > vwap: + pass # skip buy + # Require: 1h bullish (core filter) + elif htf_bullish: + buy_confirms = [] + reasons = [] + + # Signal 1: Oversold bounce (tightened: RSI<30 alone no longer triggers) + if stoch_k < 0.2 and rsi < 40: + if stoch_k > stoch_d: + buy_confirms.append(1.0) + reasons.append("超賣反彈(StochRSI K上穿D+RSI)") + else: + buy_confirms.append(0.7) + reasons.append("超賣(StochRSI+RSI)") + + # Signal 2: Mean reversion (BB touch) — allowed in weak_trend regime too + if bb_lower > 0 and close <= bb_lower * 1.005: + if rsi < 40 and cmf > 0: + buy_confirms.append(1.0) + reasons.append("均值回歸(BB+RSI+CMF)") + + # Signal 3: Trend start (MACD golden cross) — only in trending regime + macd_cross_up = (pd.notna(prev_macd) and pd.notna(prev_macd_signal) + and prev_macd <= prev_macd_signal and macd > macd_signal) + prev_macd_hist = ind_prev.get("macd_hist", 0) + if macd_cross_up and ema9 > ema21 and market_regime == "trending": + # MACD histogram direction confirmation: require histogram rising + if pd.notna(prev_macd_hist) and macd_hist > prev_macd_hist: + buy_confirms.append(1.0 if (pd.notna(adx) and adx > 20) else 0.8) + reasons.append("趨勢啟動(MACD金叉+hist↑+EMA)") + else: + buy_confirms.append(0.6 if (pd.notna(adx) and adx > 20) else 0.4) + reasons.append("趨勢啟動(MACD金叉+EMA)") + + # Signal 4: Support bounce — only in trending regime + if market_regime == "trending": + near_support = _near_level(close, s1) or _near_level(close, s2) + if near_support and rsi < 45: + buy_confirms.append(0.7 if obv_slope > 0 else 0.5) + reasons.append("支撐反彈(Pivot+RSI)") + + # In weak_trend, only allow mean reversion (Signal 2) + if market_regime == "weak_trend": + buy_confirms = [c for c, r in zip(buy_confirms, reasons) if "均值回歸" in r] + reasons = [r for r in reasons if "均值回歸" in r] + + # Score: sum of weighted confirmations + total_score = sum(buy_confirms) + n_confirms = len(buy_confirms) + + # Buy pressure filter (OHLCV proxy for order book imbalance) + if buy_pressure < 0.3: + total_score *= 0.5 # heavy sell pressure — halve score + elif buy_pressure < 0.4: + total_score *= 0.8 # moderate sell pressure + + # Funding sentiment filter (overleveraged longs → pullback risk) + if funding and funding.get("signal") == "overleveraged_long": + total_score *= 0.6 + + if total_score >= 1.5 and htf_adx_ok: # require stronger confirmation + trending + base_conf = 0.55 + base_conf += 0.10 if htf_adx_ok else 0 + base_conf += 0.05 if htf_strong else 0 + base_conf += volume_bonus + base_conf += obv_buy_bonus + if n_confirms >= 3: + base_conf += 0.10 + conf = min(0.90, max(0.50, base_conf)) + + return { + "symbol": symbol, + "action": "BUY", + "confidence": round(conf, 2), + "reason": " + ".join(reasons), + "suggested_amount_pct": min(0.20, max(0.05, conf * 0.20)), + } + + # ========== SELL SIGNALS ========== + sell_confirms = [] + sell_reasons = [] + + # Signal 1: Overbought reversal (relaxed: RSI>65 or StochRSI>0.8) + if stoch_k > 0.8 and stoch_k < stoch_d: + if rsi > 70 and macd_hist < 0: + sell_confirms.append(1.0) + sell_reasons.append("超買反轉(StochRSI+RSI+MACD)") + elif rsi > 65: + sell_confirms.append(0.7) + sell_reasons.append("超買(StochRSI+RSI)") + + # Signal 2: Resistance rejection + near_resistance = _near_level(close, r1) or _near_level(close, r2) + if near_resistance and cmf < 0: + sell_confirms.append(0.7 if obv_slope < 0 else 0.5) + sell_reasons.append("阻力拒絕(Pivot+CMF)") + + # Signal 3: Trend reversal (MACD death cross + EMA bearish) + macd_cross_down = (pd.notna(prev_macd) and pd.notna(prev_macd_signal) + and prev_macd >= prev_macd_signal and macd < macd_signal) + if macd_cross_down and ema9 < ema21: + strength = 1.0 if (not htf_bullish) else 0.6 + sell_confirms.append(strength) + sell_reasons.append("趨勢反轉(MACD死叉+EMA)") + + # Signal 4: 1h bearish strong trend + 5m confirming (protective sell) + if htf_bearish_strong and ema9 < ema21 and rsi < 40 and obv_slope < 0: + sell_confirms.append(0.8) + sell_reasons.append("多TF空頭(1h強空+5m空)") + + # Signal 5: OBV massive outflow + CMF bearish (volume divergence, reduced weight) + if obv_slope < 0 and cmf < -0.1 and ema9 < ema21: + sell_confirms.append(0.3) + sell_reasons.append("量價背離(OBV流出+CMF賣壓)") + + # VWAP sell bonus: price significantly above VWAP + if vwap > 0 and close > vwap * 1.01: + sell_confirms.append(0.3) + sell_reasons.append("VWAP溢價") + + # Sell pressure from OHLCV proxy + if buy_pressure < 0.3: + sell_confirms.append(0.3) + sell_reasons.append("賣壓(OHLCV)") + + # Funding: overleveraged longs with high basis → liquidation risk + if funding and funding.get("signal") == "overleveraged_long": + basis_pct = funding.get("basis_pct", 0) + if basis_pct > 0.0008: # > 0.08% + sell_confirms.append(0.4) + sell_reasons.append("資金費率過高") + + total_sell = sum(sell_confirms) + if total_sell >= 1.2 and has_position: + base_conf = 0.55 + if not htf_bullish: + base_conf += 0.10 + if htf_bearish_strong: + base_conf += 0.05 + base_conf += obv_sell_bonus + base_conf += volume_bonus + if len(sell_confirms) >= 3: + base_conf += 0.10 + # If 1h still bullish, penalize + if htf_bullish: + base_conf -= 0.15 + conf = min(0.90, max(0.40, base_conf)) + + return { + "symbol": symbol, + "action": "SELL", + "confidence": round(conf, 2), + "reason": " + ".join(sell_reasons), + "suggested_amount_pct": 1.0, + } + + return {**hold, "reason": "確認不足"} diff --git a/indicators.py b/indicators.py index 27f9f29..3df505c 100644 --- a/indicators.py +++ b/indicators.py @@ -68,6 +68,10 @@ def calculate_indicators(candles_df: pd.DataFrame) -> pd.DataFrame: # CMF (Chaikin Money Flow, 20-period) df["cmf"] = ta.volume.chaikin_money_flow(high, low, close, volume, window=20) + # Buy pressure (order book imbalance proxy from OHLCV) + hl_range = high - low + df["buy_pressure"] = ((close - low) / hl_range).where(hl_range > 0, 0.5) + return df