"""Fetch and cache historical candle data from Bitfinex public API.""" import logging import os import time import pandas as pd import requests logger = logging.getLogger(__name__) CACHE_DIR = os.path.join(os.path.dirname(__file__), "..", "cache", "backtest") BFX_BASE_URL = "https://api-pub.bitfinex.com" MAX_CANDLES_PER_REQUEST = 10000 def _fetch_candles_page(symbol: str, timeframe: str, start_ms: int, end_ms: int, limit: int = MAX_CANDLES_PER_REQUEST) -> list: url = f"{BFX_BASE_URL}/v2/candles/trade:{timeframe}:{symbol}/hist" params = {"start": start_ms, "end": end_ms, "limit": limit, "sort": 1} for attempt in range(5): resp = requests.get(url, params=params, timeout=30) if resp.status_code == 429: wait = 2 ** attempt + 1 logger.warning("Rate limited, waiting %ds...", wait) time.sleep(wait) continue resp.raise_for_status() return resp.json() resp.raise_for_status() return [] def fetch_historical_candles(symbol: str, timeframe: str, start_ms: int, end_ms: int) -> pd.DataFrame: """Fetch candles with pagination. Returns full DataFrame sorted ascending.""" all_candles = [] current_start = start_ms while current_start < end_ms: raw = _fetch_candles_page(symbol, timeframe, current_start, end_ms) if not raw: break all_candles.extend(raw) last_ts = raw[-1][0] if last_ts <= current_start: break current_start = last_ts + 1 time.sleep(1.5) if not all_candles: return pd.DataFrame() df = pd.DataFrame(all_candles, columns=["timestamp", "open", "close", "high", "low", "volume"]) df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms") df = df.drop_duplicates(subset=["timestamp"]).sort_values("timestamp").reset_index(drop=True) return df def _cache_path(symbol: str, timeframe: str) -> str: os.makedirs(CACHE_DIR, exist_ok=True) safe_sym = symbol.replace(":", "_") return os.path.join(CACHE_DIR, f"{safe_sym}_{timeframe}.csv") def load_or_fetch_perp(start_date: str, end_date: str) -> pd.DataFrame: """Load BTC perpetual (tBTCF0:USTF0) 1h candles for basis/funding calculation. Returns DataFrame with columns: timestamp, open, close, high, low, volume. Returns empty DataFrame on failure (graceful degradation). """ symbol = "tBTCF0:USTF0" start_ms = int(pd.Timestamp(start_date).timestamp() * 1000) end_ms = int(pd.Timestamp(end_date).timestamp() * 1000) cache = _cache_path(symbol, "1h") range_hours = (end_ms - start_ms) / 3_600_000 min_expected = int(range_hours * 0.5) if os.path.exists(cache): df = pd.read_csv(cache, parse_dates=["timestamp"]) if len(df) >= min_expected: logger.info("Loaded perp %s from cache (%d candles)", symbol, len(df)) return df logger.info("Perp cache too small (%d < %d), re-fetching", len(df), min_expected) os.remove(cache) try: logger.info("Fetching %s 1h from Bitfinex...", symbol) df = fetch_historical_candles(symbol, "1h", start_ms, end_ms) if not df.empty: df.to_csv(cache, index=False) logger.info("Fetched %s 1h: %d candles", symbol, len(df)) return df except Exception as e: logger.warning("Failed to fetch perp data %s: %s (continuing without)", symbol, e) return pd.DataFrame() def load_or_fetch(symbols: list[str], start_date: str, end_date: str) -> dict[str, dict]: """Load from cache if available, otherwise fetch and cache. Returns {symbol: {"candles_5m": DataFrame, "candles_1h": DataFrame}}. """ start_ms = int(pd.Timestamp(start_date).timestamp() * 1000) end_ms = int(pd.Timestamp(end_date).timestamp() * 1000) # Calculate expected minimum candle count for the date range range_hours = (end_ms - start_ms) / 3_600_000 min_5m = int(range_hours * 12 * 0.5) # at least 50% of expected min_1h = int(range_hours * 0.5) data = {} for sym in symbols: result = {} for tf in ("5m", "1h"): cache = _cache_path(sym, tf) min_expected = min_5m if tf == "5m" else min_1h df = pd.DataFrame() if os.path.exists(cache): df = pd.read_csv(cache, parse_dates=["timestamp"]) if len(df) < min_expected: logger.info("Cache %s %s too small (%d < %d), re-fetching", sym, tf, len(df), min_expected) os.remove(cache) df = pd.DataFrame() else: logger.info("Loaded %s %s from cache (%d candles)", sym, tf, len(df)) if df.empty: try: logger.info("Fetching %s %s from Bitfinex...", sym, tf) df = fetch_historical_candles(sym, tf, start_ms, end_ms) if not df.empty: df.to_csv(cache, index=False) logger.info("Fetched %s %s: %d candles", sym, tf, len(df)) time.sleep(1.5) except Exception as e: logger.warning("Failed to fetch %s %s: %s (skipping)", sym, tf, e) df = pd.DataFrame() result[f"candles_{tf}"] = df data[sym] = result return data