import hashlib import hmac import json import logging import os import time import pandas as pd import requests import config logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Public API helpers # --------------------------------------------------------------------------- def _pub_get(path: str, params: dict | None = None) -> dict | list: url = f"{config.BFX_BASE_URL}{path}" resp = requests.get(url, params=params, timeout=15) resp.raise_for_status() return resp.json() # --------------------------------------------------------------------------- # Authenticated API helpers # --------------------------------------------------------------------------- def _auth_post(path: str, body: dict | None = None) -> dict | list: body = body or {} nonce = str(int(time.time() * 1000000)) body_json = json.dumps(body) signature_payload = f"/api{path}{nonce}{body_json}" sig = hmac.new( config.BFX_API_SECRET.encode("utf-8"), signature_payload.encode("utf-8"), hashlib.sha384, ).hexdigest() headers = { "bfx-nonce": nonce, "bfx-apikey": config.BFX_API_KEY, "bfx-signature": sig, "content-type": "application/json", } url = f"{config.BFX_AUTH_URL}{path}" resp = requests.post(url, headers=headers, data=body_json, timeout=15) if resp.status_code >= 400: logger.error("API %s returned %d: %s", path, resp.status_code, resp.text[:500]) resp.raise_for_status() return resp.json() # --------------------------------------------------------------------------- # Deposit history (cached locally, refreshed hourly) # --------------------------------------------------------------------------- # 2026-03-10 00:00:00 UTC in milliseconds _DEPOSIT_START_MS = 1773100800000 _DEPOSIT_CACHE_FILE = "deposit_cache.json" _DEPOSIT_CACHE_TTL = 3600 # 1 hour def fetch_total_deposits() -> float: """Return net USDT deposits since 2026-03-10. Uses local cache, refreshes hourly.""" # Try cache first if os.path.exists(_DEPOSIT_CACHE_FILE): try: with open(_DEPOSIT_CACHE_FILE) as f: cache = json.load(f) if time.time() - cache.get("ts", 0) < _DEPOSIT_CACHE_TTL: return cache["total"] except (json.JSONDecodeError, IOError, KeyError): pass # Cache miss or expired — fetch from API total = _fetch_deposits_from_api() if total > 0: try: with open(_DEPOSIT_CACHE_FILE, "w") as f: json.dump({"total": total, "ts": time.time()}, f) except IOError: pass return total def _fetch_deposits_from_api() -> float: """Fetch net USDT deposits from Bitfinex API.""" try: raw = _auth_post("/v2/auth/r/movements/UST/hist", { "start": _DEPOSIT_START_MS, "limit": 100, }) except Exception as e: logger.error("Failed to fetch deposit history: %s", e) return 0.0 total = 0.0 for m in raw: status = m[9] if len(m) > 9 else "" if status != "COMPLETED": continue amount = float(m[12]) if len(m) > 12 else 0 total += amount # positive = deposit, negative = withdrawal return total # --------------------------------------------------------------------------- # Public endpoints # --------------------------------------------------------------------------- def fetch_tickers(symbols: list[str] | None = None) -> dict: """Return {symbol: {bid, ask, last, volume, ...}} for given symbols.""" symbols = symbols or config.TOP_15_SYMBOLS params = {"symbols": ",".join(symbols)} raw = _pub_get("/v2/tickers", params) tickers = {} for t in raw: sym = t[0] tickers[sym] = { "bid": t[1], "bid_size": t[2], "ask": t[3], "ask_size": t[4], "daily_change": t[5], "daily_change_pct": t[6], "last_price": t[7], "volume": t[8], "high": t[9], "low": t[10], } return tickers def fetch_candles(symbol: str, timeframe: str | None = None, limit: int | None = None) -> pd.DataFrame: """Return DataFrame with columns: timestamp, open, close, high, low, volume.""" timeframe = timeframe or config.CANDLE_TIMEFRAME limit = limit or config.CANDLE_LIMIT path = f"/v2/candles/trade:{timeframe}:{symbol}/hist" raw = _pub_get(path, {"limit": limit, "sort": -1}) if not raw: return pd.DataFrame() df = pd.DataFrame(raw, columns=["timestamp", "open", "close", "high", "low", "volume"]) df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms") return df def fetch_order_book(symbol: str, precision: str = "P0") -> dict: """Return raw order book entries.""" path = f"/v2/book/{symbol}/{precision}" raw = _pub_get(path, {"len": 25}) bids, asks = [], [] for entry in raw: price, count, amount = entry[0], entry[1], entry[2] if amount > 0: bids.append({"price": price, "count": count, "amount": amount}) else: asks.append({"price": price, "count": count, "amount": abs(amount)}) return {"bids": bids, "asks": asks} # --------------------------------------------------------------------------- # Authenticated endpoints # --------------------------------------------------------------------------- def fetch_account_status() -> dict: """Return wallet balances and active positions from Bitfinex.""" if config.PAPER_TRADING and not config.BFX_API_KEY: return {"wallets": [], "positions": []} try: wallets_raw = _auth_post("/v2/auth/r/wallets") wallets = [] for w in wallets_raw: wallets.append({ "type": w[0], "currency": w[1], "balance": w[2], "unsettled": w[3], "available": w[4], }) positions_raw = _auth_post("/v2/auth/r/positions") positions = [] for p in positions_raw: positions.append({ "symbol": p[0], "status": p[1], "amount": p[2], "base_price": p[3], "pnl": p[6] if len(p) > 6 else 0, }) return {"wallets": wallets, "positions": positions} except Exception as e: logger.error("Failed to fetch account status: %s", e) return {"wallets": [], "positions": []} def fetch_active_orders() -> list[dict]: """Return list of active orders.""" if config.PAPER_TRADING and not config.BFX_API_KEY: return [] try: raw = _auth_post("/v2/auth/r/orders") orders = [] for o in raw: orders.append({ "id": o[0], "symbol": o[3], "amount": o[6], "price": o[16], "status": o[13], "type": o[8], }) return orders except Exception as e: logger.error("Failed to fetch active orders: %s", e) return [] # --------------------------------------------------------------------------- # Aggregation # --------------------------------------------------------------------------- def fetch_all_market_data(symbols: list[str] | None = None) -> dict: """Fetch tickers + candles (5m and 1h) for all symbols. Returns dict keyed by symbol.""" symbols = symbols or config.TOP_15_SYMBOLS tickers = fetch_tickers(symbols) data = {} for sym in symbols: try: candles = fetch_candles(sym) candles_htf = fetch_candles(sym, timeframe=config.HTF_TIMEFRAME, limit=config.HTF_CANDLE_LIMIT) data[sym] = { "ticker": tickers.get(sym, {}), "candles": candles, "candles_htf": candles_htf, } time.sleep(0.15) # Avoid Bitfinex rate limits except Exception as e: logger.warning("Failed to fetch data for %s: %s", sym, e) return data # --------------------------------------------------------------------------- # Cache helpers # --------------------------------------------------------------------------- def _ensure_cache_dirs(): os.makedirs(config.CANDLES_CACHE_DIR, exist_ok=True) os.makedirs(config.INDICATORS_CACHE_DIR, exist_ok=True) def cache_market_data(market_data: dict, indicators_data: dict | None = None): """Write candle and indicator data to cache/ as JSON files.""" _ensure_cache_dirs() for sym, md in market_data.items(): candles_df = md.get("candles") if candles_df is not None and not candles_df.empty: path = os.path.join(config.CANDLES_CACHE_DIR, f"{sym}.json") candles_df.to_json(path, orient="records", date_format="iso") if indicators_data: for sym, ind_df in indicators_data.items(): if ind_df is not None and not ind_df.empty: path = os.path.join(config.INDICATORS_CACHE_DIR, f"{sym}.json") ind_df.to_json(path, orient="records", date_format="iso") def load_cached_data() -> tuple[dict, dict]: """Load cached candles and indicators. Returns (candles_dict, indicators_dict).""" _ensure_cache_dirs() candles, indicators = {}, {} for fname in os.listdir(config.CANDLES_CACHE_DIR): if fname.endswith(".json"): sym = fname[:-5] path = os.path.join(config.CANDLES_CACHE_DIR, fname) candles[sym] = pd.read_json(path, orient="records") for fname in os.listdir(config.INDICATORS_CACHE_DIR): if fname.endswith(".json"): sym = fname[:-5] path = os.path.join(config.INDICATORS_CACHE_DIR, fname) indicators[sym] = pd.read_json(path, orient="records") return candles, indicators