bifitnex-trading/data_fetcher.py
kroutony 1abfdefecd Add advanced indicators: MTF analysis, ADX, StochRSI, OBV, CMF, pivot points
- config.py: Add HTF_TIMEFRAME (1h) and HTF_CANDLE_LIMIT (50)
- data_fetcher.py: Fetch both 5m and 1h candles per symbol
- indicators.py: Add ADX, StochRSI, OBV+slope, CMF to 5m indicators;
  new functions for HTF indicators, pivot points, and their summaries
- main.py: Wire up HTF data flow (1h indicators + pivots → LLM summary)
- llm_analyzer.py: Rewrite prompt with MTF filter (1h trend alignment),
  require 2+ confirmations for BUY, confidence scoring guide

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-14 03:26:59 +00:00

230 lines
8.0 KiB
Python

import hashlib
import hmac
import json
import logging
import os
import time
import pandas as pd
import requests
import config
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Public API helpers
# ---------------------------------------------------------------------------
def _pub_get(path: str, params: dict | None = None) -> dict | list:
url = f"{config.BFX_BASE_URL}{path}"
resp = requests.get(url, params=params, timeout=15)
resp.raise_for_status()
return resp.json()
# ---------------------------------------------------------------------------
# Authenticated API helpers
# ---------------------------------------------------------------------------
def _auth_post(path: str, body: dict | None = None) -> dict | list:
body = body or {}
nonce = str(int(time.time() * 1000000))
body_json = json.dumps(body)
signature_payload = f"/api{path}{nonce}{body_json}"
sig = hmac.new(
config.BFX_API_SECRET.encode("utf-8"),
signature_payload.encode("utf-8"),
hashlib.sha384,
).hexdigest()
headers = {
"bfx-nonce": nonce,
"bfx-apikey": config.BFX_API_KEY,
"bfx-signature": sig,
"content-type": "application/json",
}
url = f"{config.BFX_AUTH_URL}{path}"
resp = requests.post(url, headers=headers, data=body_json, timeout=15)
if resp.status_code >= 400:
logger.error("API %s returned %d: %s", path, resp.status_code, resp.text[:500])
resp.raise_for_status()
return resp.json()
# ---------------------------------------------------------------------------
# Public endpoints
# ---------------------------------------------------------------------------
def fetch_tickers(symbols: list[str] | None = None) -> dict:
"""Return {symbol: {bid, ask, last, volume, ...}} for given symbols."""
symbols = symbols or config.TOP_15_SYMBOLS
params = {"symbols": ",".join(symbols)}
raw = _pub_get("/v2/tickers", params)
tickers = {}
for t in raw:
sym = t[0]
tickers[sym] = {
"bid": t[1],
"bid_size": t[2],
"ask": t[3],
"ask_size": t[4],
"daily_change": t[5],
"daily_change_pct": t[6],
"last_price": t[7],
"volume": t[8],
"high": t[9],
"low": t[10],
}
return tickers
def fetch_candles(symbol: str, timeframe: str | None = None, limit: int | None = None) -> pd.DataFrame:
"""Return DataFrame with columns: timestamp, open, close, high, low, volume."""
timeframe = timeframe or config.CANDLE_TIMEFRAME
limit = limit or config.CANDLE_LIMIT
path = f"/v2/candles/trade:{timeframe}:{symbol}/hist"
raw = _pub_get(path, {"limit": limit, "sort": -1})
if not raw:
return pd.DataFrame()
df = pd.DataFrame(raw, columns=["timestamp", "open", "close", "high", "low", "volume"])
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
return df
def fetch_order_book(symbol: str, precision: str = "P0") -> dict:
"""Return raw order book entries."""
path = f"/v2/book/{symbol}/{precision}"
raw = _pub_get(path, {"len": 25})
bids, asks = [], []
for entry in raw:
price, count, amount = entry[0], entry[1], entry[2]
if amount > 0:
bids.append({"price": price, "count": count, "amount": amount})
else:
asks.append({"price": price, "count": count, "amount": abs(amount)})
return {"bids": bids, "asks": asks}
# ---------------------------------------------------------------------------
# Authenticated endpoints
# ---------------------------------------------------------------------------
def fetch_account_status() -> dict:
"""Return wallet balances and active positions from Bitfinex."""
if config.PAPER_TRADING and not config.BFX_API_KEY:
return {"wallets": [], "positions": []}
try:
wallets_raw = _auth_post("/v2/auth/r/wallets")
wallets = []
for w in wallets_raw:
wallets.append({
"type": w[0],
"currency": w[1],
"balance": w[2],
"unsettled": w[3],
"available": w[4],
})
positions_raw = _auth_post("/v2/auth/r/positions")
positions = []
for p in positions_raw:
positions.append({
"symbol": p[0],
"status": p[1],
"amount": p[2],
"base_price": p[3],
"pnl": p[6] if len(p) > 6 else 0,
})
return {"wallets": wallets, "positions": positions}
except Exception as e:
logger.error("Failed to fetch account status: %s", e)
return {"wallets": [], "positions": []}
def fetch_active_orders() -> list[dict]:
"""Return list of active orders."""
if config.PAPER_TRADING and not config.BFX_API_KEY:
return []
try:
raw = _auth_post("/v2/auth/r/orders")
orders = []
for o in raw:
orders.append({
"id": o[0],
"symbol": o[3],
"amount": o[6],
"price": o[16],
"status": o[13],
"type": o[8],
})
return orders
except Exception as e:
logger.error("Failed to fetch active orders: %s", e)
return []
# ---------------------------------------------------------------------------
# Aggregation
# ---------------------------------------------------------------------------
def fetch_all_market_data(symbols: list[str] | None = None) -> dict:
"""Fetch tickers + candles (5m and 1h) for all symbols. Returns dict keyed by symbol."""
symbols = symbols or config.TOP_15_SYMBOLS
tickers = fetch_tickers(symbols)
data = {}
for sym in symbols:
try:
candles = fetch_candles(sym)
candles_htf = fetch_candles(sym, timeframe=config.HTF_TIMEFRAME, limit=config.HTF_CANDLE_LIMIT)
data[sym] = {
"ticker": tickers.get(sym, {}),
"candles": candles,
"candles_htf": candles_htf,
}
time.sleep(0.15) # Avoid Bitfinex rate limits
except Exception as e:
logger.warning("Failed to fetch data for %s: %s", sym, e)
return data
# ---------------------------------------------------------------------------
# Cache helpers
# ---------------------------------------------------------------------------
def _ensure_cache_dirs():
os.makedirs(config.CANDLES_CACHE_DIR, exist_ok=True)
os.makedirs(config.INDICATORS_CACHE_DIR, exist_ok=True)
def cache_market_data(market_data: dict, indicators_data: dict | None = None):
"""Write candle and indicator data to cache/ as JSON files."""
_ensure_cache_dirs()
for sym, md in market_data.items():
candles_df = md.get("candles")
if candles_df is not None and not candles_df.empty:
path = os.path.join(config.CANDLES_CACHE_DIR, f"{sym}.json")
candles_df.to_json(path, orient="records", date_format="iso")
if indicators_data:
for sym, ind_df in indicators_data.items():
if ind_df is not None and not ind_df.empty:
path = os.path.join(config.INDICATORS_CACHE_DIR, f"{sym}.json")
ind_df.to_json(path, orient="records", date_format="iso")
def load_cached_data() -> tuple[dict, dict]:
"""Load cached candles and indicators. Returns (candles_dict, indicators_dict)."""
_ensure_cache_dirs()
candles, indicators = {}, {}
for fname in os.listdir(config.CANDLES_CACHE_DIR):
if fname.endswith(".json"):
sym = fname[:-5]
path = os.path.join(config.CANDLES_CACHE_DIR, fname)
candles[sym] = pd.read_json(path, orient="records")
for fname in os.listdir(config.INDICATORS_CACHE_DIR):
if fname.endswith(".json"):
sym = fname[:-5]
path = os.path.join(config.INDICATORS_CACHE_DIR, fname)
indicators[sym] = pd.read_json(path, orient="records")
return candles, indicators