Initial commit: LLM-driven crypto trading bot

Includes: Bitfinex API integration, technical indicators,
LLM signal generation, risk management, Slack notifications.

Recent fixes:
- SELL orders use position value instead of total balance
- SELL signals always close full position
- Failed orders added to rejected list for Slack reporting
- Position/exposure limits auto-cap to remaining room
- BUY order minimum raised to 10% of portfolio

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
kroutony 2026-03-13 03:25:18 +00:00
commit 972d66ab1b
13 changed files with 1472 additions and 0 deletions

4
.env.example Normal file
View File

@ -0,0 +1,4 @@
BFX_API_KEY=your_bitfinex_api_key_here
BFX_API_SECRET=your_bitfinex_api_secret_here
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/your/webhook/url
PAPER_TRADING=true

10
.gitignore vendored Normal file
View File

@ -0,0 +1,10 @@
.env
__pycache__/
*.pyc
cache/
positions.json
trade_history.csv
bot_state.json
*.log
.venv/
venv/

56
config.py Normal file
View File

@ -0,0 +1,56 @@
import os
from dotenv import load_dotenv
load_dotenv()
# Bitfinex API
BFX_API_KEY = os.getenv("BFX_API_KEY", "")
BFX_API_SECRET = os.getenv("BFX_API_SECRET", "")
BFX_BASE_URL = "https://api-pub.bitfinex.com"
BFX_AUTH_URL = "https://api.bitfinex.com"
# Slack
SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL", "")
# Top USDT trading pairs on Bitfinex (verified active)
# Note: some use "tXXXUST" format, others use "tXXX:UST" (5+ char base)
TOP_15_SYMBOLS = [
"tBTCUST", "tETHUST", "tSOLUST", "tXRPUST", "tADAUST",
"tDOGE:UST", "tAVAX:UST", "tDOTUST", "tLINK:UST", "tLTCUST",
"tSHIB:UST", "tATOUST", "tUNIUST", "tSUIUST", "tXLMUST",
]
# Symbol display names for readability
SYMBOL_NAMES = {
"tBTCUST": "BTC/USDT", "tETHUST": "ETH/USDT", "tSOLUST": "SOL/USDT",
"tXRPUST": "XRP/USDT", "tADAUST": "ADA/USDT", "tDOGE:UST": "DOGE/USDT",
"tAVAX:UST": "AVAX/USDT", "tDOTUST": "DOT/USDT", "tLINK:UST": "LINK/USDT",
"tLTCUST": "LTC/USDT", "tSHIB:UST": "SHIB/USDT", "tATOUST": "ATOM/USDT",
"tUNIUST": "UNI/USDT", "tSUIUST": "SUI/USDT", "tXLMUST": "XLM/USDT",
}
# Candle settings
CANDLE_TIMEFRAME = "5m"
CANDLE_LIMIT = 100
# Trading parameters
MIN_ORDER_USDT = 5
STOP_LOSS_PCT = 0.02
TAKE_PROFIT_PCT = 0.03
MAX_POSITION_PCT = 0.30 # Max 30% of portfolio per coin
MAX_TOTAL_EXPOSURE_PCT = 1.00 # No cap — use full balance
# Mode
PAPER_TRADING = os.getenv("PAPER_TRADING", "true").lower() == "true"
# Cache
CACHE_DIR = "cache/"
CANDLES_CACHE_DIR = os.path.join(CACHE_DIR, "candles/")
INDICATORS_CACHE_DIR = os.path.join(CACHE_DIR, "indicators/")
# Positions file
POSITIONS_FILE = "positions.json"
# Schedule
RUN_INTERVAL_MINUTES = 5
STATUS_REPORT_INTERVAL_MINUTES = 60

224
data_fetcher.py Normal file
View File

@ -0,0 +1,224 @@
import hashlib
import hmac
import json
import logging
import os
import time
import pandas as pd
import requests
import config
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Public API helpers
# ---------------------------------------------------------------------------
def _pub_get(path: str, params: dict | None = None) -> dict | list:
url = f"{config.BFX_BASE_URL}{path}"
resp = requests.get(url, params=params, timeout=15)
resp.raise_for_status()
return resp.json()
# ---------------------------------------------------------------------------
# Authenticated API helpers
# ---------------------------------------------------------------------------
def _auth_post(path: str, body: dict | None = None) -> dict | list:
body = body or {}
nonce = str(int(time.time() * 1000000))
body_json = json.dumps(body)
signature_payload = f"/api{path}{nonce}{body_json}"
sig = hmac.new(
config.BFX_API_SECRET.encode("utf-8"),
signature_payload.encode("utf-8"),
hashlib.sha384,
).hexdigest()
headers = {
"bfx-nonce": nonce,
"bfx-apikey": config.BFX_API_KEY,
"bfx-signature": sig,
"content-type": "application/json",
}
url = f"{config.BFX_AUTH_URL}{path}"
resp = requests.post(url, headers=headers, data=body_json, timeout=15)
resp.raise_for_status()
return resp.json()
# ---------------------------------------------------------------------------
# Public endpoints
# ---------------------------------------------------------------------------
def fetch_tickers(symbols: list[str] | None = None) -> dict:
"""Return {symbol: {bid, ask, last, volume, ...}} for given symbols."""
symbols = symbols or config.TOP_15_SYMBOLS
params = {"symbols": ",".join(symbols)}
raw = _pub_get("/v2/tickers", params)
tickers = {}
for t in raw:
sym = t[0]
tickers[sym] = {
"bid": t[1],
"bid_size": t[2],
"ask": t[3],
"ask_size": t[4],
"daily_change": t[5],
"daily_change_pct": t[6],
"last_price": t[7],
"volume": t[8],
"high": t[9],
"low": t[10],
}
return tickers
def fetch_candles(symbol: str, timeframe: str | None = None, limit: int | None = None) -> pd.DataFrame:
"""Return DataFrame with columns: timestamp, open, close, high, low, volume."""
timeframe = timeframe or config.CANDLE_TIMEFRAME
limit = limit or config.CANDLE_LIMIT
path = f"/v2/candles/trade:{timeframe}:{symbol}/hist"
raw = _pub_get(path, {"limit": limit, "sort": -1})
if not raw:
return pd.DataFrame()
df = pd.DataFrame(raw, columns=["timestamp", "open", "close", "high", "low", "volume"])
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
return df
def fetch_order_book(symbol: str, precision: str = "P0") -> dict:
"""Return raw order book entries."""
path = f"/v2/book/{symbol}/{precision}"
raw = _pub_get(path, {"len": 25})
bids, asks = [], []
for entry in raw:
price, count, amount = entry[0], entry[1], entry[2]
if amount > 0:
bids.append({"price": price, "count": count, "amount": amount})
else:
asks.append({"price": price, "count": count, "amount": abs(amount)})
return {"bids": bids, "asks": asks}
# ---------------------------------------------------------------------------
# Authenticated endpoints
# ---------------------------------------------------------------------------
def fetch_account_status() -> dict:
"""Return wallet balances and active positions from Bitfinex."""
if config.PAPER_TRADING and not config.BFX_API_KEY:
return {"wallets": [], "positions": []}
try:
wallets_raw = _auth_post("/v2/auth/r/wallets")
wallets = []
for w in wallets_raw:
wallets.append({
"type": w[0],
"currency": w[1],
"balance": w[2],
"unsettled": w[3],
"available": w[4],
})
positions_raw = _auth_post("/v2/auth/r/positions")
positions = []
for p in positions_raw:
positions.append({
"symbol": p[0],
"status": p[1],
"amount": p[2],
"base_price": p[3],
"pnl": p[6] if len(p) > 6 else 0,
})
return {"wallets": wallets, "positions": positions}
except Exception as e:
logger.error("Failed to fetch account status: %s", e)
return {"wallets": [], "positions": []}
def fetch_active_orders() -> list[dict]:
"""Return list of active orders."""
if config.PAPER_TRADING and not config.BFX_API_KEY:
return []
try:
raw = _auth_post("/v2/auth/r/orders")
orders = []
for o in raw:
orders.append({
"id": o[0],
"symbol": o[3],
"amount": o[6],
"price": o[16],
"status": o[13],
"type": o[8],
})
return orders
except Exception as e:
logger.error("Failed to fetch active orders: %s", e)
return []
# ---------------------------------------------------------------------------
# Aggregation
# ---------------------------------------------------------------------------
def fetch_all_market_data(symbols: list[str] | None = None) -> dict:
"""Fetch tickers + candles for all symbols. Returns dict keyed by symbol."""
symbols = symbols or config.TOP_15_SYMBOLS
tickers = fetch_tickers(symbols)
data = {}
for sym in symbols:
try:
candles = fetch_candles(sym)
data[sym] = {
"ticker": tickers.get(sym, {}),
"candles": candles,
}
except Exception as e:
logger.warning("Failed to fetch data for %s: %s", sym, e)
return data
# ---------------------------------------------------------------------------
# Cache helpers
# ---------------------------------------------------------------------------
def _ensure_cache_dirs():
os.makedirs(config.CANDLES_CACHE_DIR, exist_ok=True)
os.makedirs(config.INDICATORS_CACHE_DIR, exist_ok=True)
def cache_market_data(market_data: dict, indicators_data: dict | None = None):
"""Write candle and indicator data to cache/ as JSON files."""
_ensure_cache_dirs()
for sym, md in market_data.items():
candles_df = md.get("candles")
if candles_df is not None and not candles_df.empty:
path = os.path.join(config.CANDLES_CACHE_DIR, f"{sym}.json")
candles_df.to_json(path, orient="records", date_format="iso")
if indicators_data:
for sym, ind_df in indicators_data.items():
if ind_df is not None and not ind_df.empty:
path = os.path.join(config.INDICATORS_CACHE_DIR, f"{sym}.json")
ind_df.to_json(path, orient="records", date_format="iso")
def load_cached_data() -> tuple[dict, dict]:
"""Load cached candles and indicators. Returns (candles_dict, indicators_dict)."""
_ensure_cache_dirs()
candles, indicators = {}, {}
for fname in os.listdir(config.CANDLES_CACHE_DIR):
if fname.endswith(".json"):
sym = fname[:-5]
path = os.path.join(config.CANDLES_CACHE_DIR, fname)
candles[sym] = pd.read_json(path, orient="records")
for fname in os.listdir(config.INDICATORS_CACHE_DIR):
if fname.endswith(".json"):
sym = fname[:-5]
path = os.path.join(config.INDICATORS_CACHE_DIR, fname)
indicators[sym] = pd.read_json(path, orient="records")
return candles, indicators

101
indicators.py Normal file
View File

@ -0,0 +1,101 @@
import logging
import pandas as pd
import ta
import config
logger = logging.getLogger(__name__)
def calculate_indicators(candles_df: pd.DataFrame) -> pd.DataFrame:
"""Add technical indicators to a candles DataFrame.
Expected input columns: timestamp, open, close, high, low, volume.
Returns the DataFrame with additional indicator columns.
"""
df = candles_df.copy()
if df.empty or len(df) < 26:
logger.warning("Not enough candle data to compute indicators (got %d rows)", len(df))
return df
close = df["close"]
high = df["high"]
low = df["low"]
volume = df["volume"]
# RSI(14)
df["rsi"] = ta.momentum.rsi(close, window=14)
# MACD(12, 26, 9)
macd = ta.trend.MACD(close, window_slow=26, window_fast=12, window_sign=9)
df["macd"] = macd.macd()
df["macd_signal"] = macd.macd_signal()
df["macd_hist"] = macd.macd_diff()
# Bollinger Bands(20, 2)
bb = ta.volatility.BollingerBands(close, window=20, window_dev=2)
df["bb_upper"] = bb.bollinger_hband()
df["bb_middle"] = bb.bollinger_mavg()
df["bb_lower"] = bb.bollinger_lband()
# EMA(9) and EMA(21)
df["ema9"] = ta.trend.ema_indicator(close, window=9)
df["ema21"] = ta.trend.ema_indicator(close, window=21)
# VWAP (cumulative for the session)
tp = (high + low + close) / 3
cumvol = volume.cumsum()
df["vwap"] = (tp * volume).cumsum() / cumvol.replace(0, float("nan"))
# ATR(14)
df["atr"] = ta.volatility.average_true_range(high, low, close, window=14)
# Volume moving average (20)
df["vol_ma20"] = volume.rolling(window=20).mean()
return df
def summarize_indicators(df: pd.DataFrame, symbol: str) -> str:
"""Produce a concise text summary of the latest indicator values for LLM consumption."""
if df.empty or len(df) < 2:
return f"{symbol}: insufficient data\n"
last = df.iloc[-1]
prev = df.iloc[-2]
name = config.SYMBOL_NAMES.get(symbol, symbol)
lines = [f"### {name} ({symbol})"]
lines.append(f"Price: {last['close']:.6g} | High: {last['high']:.6g} | Low: {last['low']:.6g}")
lines.append(f"Volume: {last['volume']:.2f} | Vol MA20: {last.get('vol_ma20', 0):.2f}")
if pd.notna(last.get("rsi")):
lines.append(f"RSI(14): {last['rsi']:.1f}")
if pd.notna(last.get("macd")):
hist_dir = "" if last["macd_hist"] > prev.get("macd_hist", 0) else ""
lines.append(
f"MACD: {last['macd']:.6g} | Signal: {last['macd_signal']:.6g} | "
f"Hist: {last['macd_hist']:.6g} ({hist_dir})"
)
if pd.notna(last.get("bb_upper")):
lines.append(
f"BB: Upper={last['bb_upper']:.6g} Mid={last['bb_middle']:.6g} Lower={last['bb_lower']:.6g}"
)
if pd.notna(last.get("ema9")):
trend = "bullish" if last["ema9"] > last["ema21"] else "bearish"
lines.append(f"EMA9: {last['ema9']:.6g} | EMA21: {last['ema21']:.6g}{trend}")
if pd.notna(last.get("atr")):
lines.append(f"ATR(14): {last['atr']:.6g}")
if pd.notna(last.get("vwap")):
lines.append(f"VWAP: {last['vwap']:.6g}")
return "\n".join(lines) + "\n"
def summarize_all(indicators_by_symbol: dict[str, pd.DataFrame]) -> str:
"""Build a combined indicator summary for all symbols."""
parts = []
for sym, df in sorted(indicators_by_symbol.items()):
parts.append(summarize_indicators(df, sym))
return "\n".join(parts)

101
llm_analyzer.py Normal file
View File

@ -0,0 +1,101 @@
import json
import logging
import re
import subprocess
logger = logging.getLogger(__name__)
def analyze_market(indicator_summary: str, account_status: str) -> list[dict]:
"""Call Claude CLI to analyze market data and return trade signals."""
prompt = f"""你是一位專業的加密貨幣短線交易分析師。
## 帳戶狀態
{account_status}
## 目前市場指標摘要5分鐘K線最新值
{indicator_summary}
請根據以下策略分析每個幣種並給出交易建議
## 交易策略
1. **趨勢確認**: EMA(9) > EMA(21) 為多頭反之為空頭
2. **進場訊號 (BUY)**:
- RSI < 35 MACD histogram 由負轉正超賣反彈
- 價格觸及 Bollinger 下軌且 RSI < 40均值回歸
- MACD 金叉 + EMA(9) 上穿 EMA(21)趨勢啟動
3. **進場訊號 (SELL)**:
- RSI > 70 MACD histogram 由正轉負超買回落
- 價格觸及 Bollinger 上軌且 RSI > 65
- MACD 死叉 + EMA(9) 下穿 EMA(21)
4. **過濾條件**:
- 成交量需高於 20 期平均確認動能
- ATR 過高時降低倉位波動風控
請以 JSON 格式回傳每個幣種一個物件
```json
[
{{
"symbol": "tBTCUST",
"action": "BUY" | "SELL" | "HOLD",
"confidence": 0.0-1.0,
"reason": "簡短理由",
"suggested_amount_pct": 0.10-0.20
}}
]
```
只回傳 JSON不要其他文字"""
try:
result = subprocess.run(
["claude", "-p", prompt, "--output-format", "json"],
capture_output=True,
text=True,
timeout=120,
)
if result.returncode != 0:
logger.error("Claude CLI failed (rc=%d): %s", result.returncode, result.stderr)
return []
return _parse_llm_response(result.stdout)
except subprocess.TimeoutExpired:
logger.error("Claude CLI timed out")
return []
except FileNotFoundError:
logger.error("Claude CLI not found — is 'claude' installed and in PATH?")
return []
except Exception as e:
logger.error("LLM analysis error: %s", e)
return []
def _parse_llm_response(raw: str) -> list[dict]:
"""Extract the JSON array from Claude's response."""
# First try: the output-format json wraps response in {"result": "..."}
try:
wrapper = json.loads(raw)
if isinstance(wrapper, dict) and "result" in wrapper:
raw = wrapper["result"]
except (json.JSONDecodeError, TypeError):
pass
# Try direct parse
if isinstance(raw, list):
return raw
try:
parsed = json.loads(raw)
if isinstance(parsed, list):
return parsed
except (json.JSONDecodeError, TypeError):
pass
# Try extracting JSON array from markdown code block or mixed text
match = re.search(r'\[[\s\S]*?\]', raw)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError:
pass
logger.warning("Could not parse LLM response as JSON array")
return []

282
main.py Normal file
View File

@ -0,0 +1,282 @@
#!/usr/bin/env python3
"""LLM-driven cryptocurrency trading system — single-run mode for crontab."""
import json
import logging
import os
import sys
import time
from datetime import datetime
import config
import data_fetcher
import indicators
import llm_analyzer
import portfolio as pf
import risk_manager
import slack_notifier
import trade_logger
import trader
# ---------------------------------------------------------------------------
# Logging setup
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("trading.log"),
],
)
logger = logging.getLogger("main")
STATE_FILE = "bot_state.json"
def _load_state() -> dict:
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
pass
return {"last_status_report": 0, "run_count": 0}
def _save_state(state: dict):
with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
def run_cycle():
"""Execute one full trading cycle."""
state = _load_state()
state["run_count"] = state.get("run_count", 0) + 1
logger.info("=" * 60)
logger.info("Trading cycle #%d at %s", state["run_count"], datetime.now().isoformat())
# 1. Load portfolio state
port = pf.load_positions()
# 2. Fetch account status from exchange
try:
account_status = data_fetcher.fetch_account_status()
port = pf.sync_with_exchange(port, account_status)
logger.info(
"Account: %.2f USDT total, %.2f USDT available",
port.get("total_balance_usdt", 0),
port.get("available_usdt", 0),
)
except Exception as e:
logger.error("Failed to fetch account status: %s", e)
if config.PAPER_TRADING and port.get("total_balance_usdt", 0) == 0:
port["total_balance_usdt"] = 10000
port["available_usdt"] = 10000
logger.info("Paper trading: using default 10000 USDT balance")
# 2b. Backfill missing stop-loss orders for existing positions
for sym, pos in port.get("positions", {}).items():
if pos.get("amount", 0) > 0 and not pos.get("stop_order_id"):
entry = pos.get("entry_price", 0)
amount = pos.get("amount", 0)
if entry > 0 and amount > 0:
logger.warning("Position %s missing stop-loss order, placing now", sym)
sl = trader.place_stop_loss_order(sym, amount, entry)
if sl:
pos["stop_order_id"] = sl["order_id"]
pos["stop_price"] = sl["stop_price"]
pf.save_positions(port)
logger.info("Backfill stop-loss for %s @ %.6g", sym, sl["stop_price"])
# 3. Fetch market data
try:
market_data = data_fetcher.fetch_all_market_data()
logger.info("Fetched market data for %d symbols", len(market_data))
except Exception as e:
logger.error("Failed to fetch market data: %s", e)
slack_notifier.send_error_alert(f"Market data fetch failed: {e}")
return
# 4. Calculate indicators
indicators_by_symbol = {}
current_prices = {}
for sym, md in market_data.items():
candles_df = md.get("candles")
if candles_df is not None and not candles_df.empty:
ind_df = indicators.calculate_indicators(candles_df)
indicators_by_symbol[sym] = ind_df
ticker = md.get("ticker", {})
if ticker:
current_prices[sym] = ticker.get("last_price", 0)
# 5. Cache data (persisted for next crontab run)
try:
data_fetcher.cache_market_data(market_data, indicators_by_symbol)
except Exception as e:
logger.warning("Cache write failed: %s", e)
# --- Collect cycle results ---
tp_closed = [] # take-profit closures
executed = [] # successfully executed trades
rejected = [] # signals rejected by risk manager
# 6. Check take-profit on existing positions (stop-loss is handled by exchange stop orders)
tp_signals = risk_manager.apply_stop_loss_take_profit(
port.get("positions", {}), current_prices
)
for signal in tp_signals:
sym = signal["symbol"]
pos = port.get("positions", {}).get(sym, {})
amount = pos.get("amount", 0)
price = current_prices.get(sym, 0)
if amount > 0 and price > 0:
# Cancel the exchange stop-loss order before selling
stop_order_id = pos.get("stop_order_id")
if stop_order_id:
trader.cancel_order(stop_order_id)
signal["amount_usdt"] = amount * price
result = trader.execute_trade(signal, current_prices)
if result and result.get("status") in ("filled", "submitted"):
port = pf.update_position(port, sym, "SELL", amount, price, signal["amount_usdt"])
pf.save_positions(port)
trade_logger.log_trade(result)
tp_closed.append({**signal, "amount_usdt": amount * price})
logger.info("Take profit executed: %s", sym)
# 7. Build indicator summary for LLM
indicator_summary = indicators.summarize_all(indicators_by_symbol)
# 8. Build account status string for LLM
account_str = _build_account_string(port, current_prices)
# 9. Call LLM for analysis
signals = []
try:
signals = llm_analyzer.analyze_market(indicator_summary, account_str)
logger.info("LLM returned %d signals", len(signals))
except Exception as e:
logger.error("LLM analysis failed: %s", e)
slack_notifier.send_error_alert(f"LLM analysis failed: {e}")
# 10. Validate and execute trades
for signal in signals:
action = signal.get("action", "HOLD")
if action == "HOLD":
continue
ind_df = indicators_by_symbol.get(signal.get("symbol"))
validated, reject_reason = risk_manager.validate_trade(signal, port, ind_df)
if validated is None:
logger.info("Signal rejected by risk manager: %s %s%s", action, signal.get("symbol"), reject_reason)
rejected.append({**signal, "reject_reason": reject_reason})
continue
result = trader.execute_trade(validated, current_prices)
if result and result.get("status") == "failed":
logger.warning("Order failed at exchange: %s %s%s", action, validated["symbol"], result.get("error", ""))
rejected.append({**validated, "reject_reason": f"交易所錯誤: {result.get('error', 'unknown')}"})
continue
if result and result.get("status") in ("filled", "submitted"):
sym = validated["symbol"]
amount = result.get("amount", 0)
price = result.get("price", 0)
amount_usdt = result.get("amount_usdt", 0)
if action == "SELL":
# Cancel existing stop-loss order before selling
pos = port.get("positions", {}).get(sym, {})
stop_order_id = pos.get("stop_order_id")
if stop_order_id:
trader.cancel_order(stop_order_id)
port = pf.update_position(port, sym, action, amount, price, amount_usdt)
stop_price = None
if action == "BUY":
# Place exchange stop-loss order immediately after buy
sl = trader.place_stop_loss_order(sym, amount, price)
if sl:
port.setdefault("positions", {}).setdefault(sym, {})["stop_order_id"] = sl["order_id"]
port["positions"][sym]["stop_price"] = sl["stop_price"]
stop_price = sl["stop_price"]
logger.info("Stop-loss set for %s @ %.6g", sym, sl["stop_price"])
pf.save_positions(port)
trade_logger.log_trade(result)
executed.append({**result, "stop_price": stop_price})
# 11. Send unified cycle report to Slack
portfolio_summary = _build_portfolio_one_liner(port, current_prices)
slack_notifier.send_cycle_report(
cycle_number=state["run_count"],
signals=signals,
executed=executed,
rejected=rejected,
tp_closed=tp_closed,
portfolio_summary=portfolio_summary,
)
# 12. Save final state
pf.save_positions(port)
# 13. Hourly status report (check elapsed time across crontab runs)
now = time.time()
last_report = state.get("last_status_report", 0)
if now - last_report >= config.STATUS_REPORT_INTERVAL_MINUTES * 60:
summary = pf.get_portfolio_summary(port, current_prices)
slack_notifier.send_status_update(summary)
logger.info("Hourly status report sent")
state["last_status_report"] = now
state["last_run"] = now
_save_state(state)
logger.info("Cycle complete")
def _build_portfolio_one_liner(port: dict, current_prices: dict) -> str:
"""Build a one-line portfolio summary for the cycle report."""
available = port.get("available_usdt", 0)
positions = port.get("positions", {})
pos_count = sum(1 for p in positions.values() if p.get("amount", 0) > 0)
unrealized = 0.0
for sym, pos in positions.items():
amount = pos.get("amount", 0)
entry = pos.get("entry_price", 0)
current = current_prices.get(sym, 0)
if amount > 0 and entry > 0 and current > 0:
unrealized += amount * (current - entry)
return f"{available:.2f} USDT 可用 | 持倉 {pos_count} 筆 | 未實現 {unrealized:+.2f} USDT"
def _build_account_string(port: dict, current_prices: dict) -> str:
"""Build a concise account status string for the LLM prompt."""
lines = []
lines.append(f"Total Balance: {port.get('total_balance_usdt', 0):.2f} USDT")
lines.append(f"Available: {port.get('available_usdt', 0):.2f} USDT")
positions = port.get("positions", {})
if positions:
lines.append("\nOpen Positions:")
for sym, pos in positions.items():
amount = pos.get("amount", 0)
entry = pos.get("entry_price", 0)
current = current_prices.get(sym, 0)
pnl_pct = ((current - entry) / entry * 100) if entry > 0 and current > 0 else 0
name = config.SYMBOL_NAMES.get(sym, sym)
lines.append(
f" {name}: {amount:.6f} @ {entry:.6g} "
f"(now {current:.6g}, {pnl_pct:+.2f}%)"
)
else:
lines.append("\nNo open positions.")
return "\n".join(lines)
if __name__ == "__main__":
mode = "PAPER" if config.PAPER_TRADING else "LIVE"
logger.info("Running in %s mode, monitoring %d symbols", mode, len(config.TOP_15_SYMBOLS))
run_cycle()

169
portfolio.py Normal file
View File

@ -0,0 +1,169 @@
import json
import logging
import os
import time
import config
logger = logging.getLogger(__name__)
def load_positions() -> dict:
"""Load positions from positions.json."""
if not os.path.exists(config.POSITIONS_FILE):
return _empty_portfolio()
try:
with open(config.POSITIONS_FILE) as f:
data = json.load(f)
return data
except (json.JSONDecodeError, IOError) as e:
logger.error("Failed to load positions: %s", e)
return _empty_portfolio()
def save_positions(portfolio: dict):
"""Save portfolio state to positions.json."""
with open(config.POSITIONS_FILE, "w") as f:
json.dump(portfolio, f, indent=2)
def _empty_portfolio() -> dict:
return {
"total_balance_usdt": 0,
"available_usdt": 0,
"positions": {},
"trade_history": [],
"last_updated": time.time(),
}
def update_position(portfolio: dict, symbol: str, action: str, amount: float, price: float, amount_usdt: float) -> dict:
"""Update portfolio after a trade execution."""
positions = portfolio.setdefault("positions", {})
if action == "BUY":
existing = positions.get(symbol, {"amount": 0, "entry_price": 0, "value_usdt": 0})
old_amount = existing.get("amount", 0)
old_entry = existing.get("entry_price", 0)
new_amount = old_amount + amount
# Weighted average entry price
if new_amount > 0:
new_entry = (old_amount * old_entry + amount * price) / new_amount
else:
new_entry = price
positions[symbol] = {
"amount": new_amount,
"entry_price": new_entry,
"value_usdt": new_amount * price,
"last_update": time.time(),
}
portfolio["available_usdt"] = portfolio.get("available_usdt", 0) - amount_usdt
elif action == "SELL":
existing = positions.get(symbol, {})
old_amount = existing.get("amount", 0)
sell_amount = min(amount, old_amount)
remaining = old_amount - sell_amount
if remaining <= 0.000001:
positions.pop(symbol, None)
else:
positions[symbol] = {
"amount": remaining,
"entry_price": existing.get("entry_price", price),
"value_usdt": remaining * price,
"last_update": time.time(),
}
portfolio["available_usdt"] = portfolio.get("available_usdt", 0) + sell_amount * price
# Record trade
portfolio.setdefault("trade_history", []).append({
"symbol": symbol,
"action": action,
"amount": amount,
"price": price,
"amount_usdt": amount_usdt,
"timestamp": time.time(),
})
portfolio["last_updated"] = time.time()
return portfolio
def calculate_pnl(portfolio: dict, current_prices: dict) -> dict:
"""Calculate unrealized P&L for all positions."""
pnl = {}
for sym, pos in portfolio.get("positions", {}).items():
amount = pos.get("amount", 0)
entry = pos.get("entry_price", 0)
current = current_prices.get(sym, 0)
if amount > 0 and entry > 0 and current > 0:
unrealized = (current - entry) * amount
unrealized_pct = (current - entry) / entry
pnl[sym] = {
"amount": amount,
"entry_price": entry,
"current_price": current,
"unrealized_usdt": unrealized,
"unrealized_pct": unrealized_pct,
"value_usdt": amount * current,
}
return pnl
def get_portfolio_summary(portfolio: dict, current_prices: dict) -> str:
"""Generate a human-readable portfolio summary."""
pnl = calculate_pnl(portfolio, current_prices)
total_value = portfolio.get("available_usdt", 0)
total_unrealized = 0
lines = ["📊 *Portfolio Summary*"]
lines.append(f"Available USDT: {portfolio.get('available_usdt', 0):.2f}")
lines.append("")
if pnl:
lines.append("*Open Positions:*")
for sym, p in pnl.items():
name = config.SYMBOL_NAMES.get(sym, sym)
emoji = "🟢" if p["unrealized_pct"] >= 0 else "🔴"
lines.append(
f"{emoji} {name}: {p['amount']:.6f} @ {p['entry_price']:.6g}"
f"{p['current_price']:.6g} ({p['unrealized_pct']:+.2%}) "
f"= {p['unrealized_usdt']:+.2f} USDT"
)
total_value += p["value_usdt"]
total_unrealized += p["unrealized_usdt"]
lines.append("")
lines.append(f"*Total Portfolio Value:* {total_value:.2f} USDT")
lines.append(f"*Total Unrealized P&L:* {total_unrealized:+.2f} USDT")
history = portfolio.get("trade_history", [])
if history:
lines.append(f"\n*Trades today:* {len(history)}")
return "\n".join(lines)
def sync_with_exchange(portfolio: dict, account_status: dict) -> dict:
"""Sync portfolio with exchange account status."""
wallets = account_status.get("wallets", [])
for w in wallets:
if w.get("type") == "exchange" and w.get("currency") in ("UST", "USDT"):
portfolio["total_balance_usdt"] = w.get("balance", 0)
portfolio["available_usdt"] = w.get("available", 0) or w.get("balance", 0)
break
# Sync exchange positions
for p in account_status.get("positions", []):
sym = p.get("symbol", "")
if sym and p.get("amount", 0) != 0:
portfolio.setdefault("positions", {})[sym] = {
"amount": abs(p["amount"]),
"entry_price": p.get("base_price", 0),
"value_usdt": abs(p["amount"]) * p.get("base_price", 0),
"last_update": time.time(),
"source": "exchange",
}
portfolio["last_updated"] = time.time()
return portfolio

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
requests>=2.31.0
pandas>=2.0.0
ta>=0.11.0
python-dotenv>=1.0.0

174
risk_manager.py Normal file
View File

@ -0,0 +1,174 @@
import logging
import config
logger = logging.getLogger(__name__)
def check_position_limit(symbol: str, amount_usdt: float, portfolio: dict) -> bool:
"""Reject if a single position would exceed MAX_POSITION_PCT of total balance."""
total_balance = portfolio.get("total_balance_usdt", 0)
if total_balance <= 0:
return False
current_exposure = portfolio.get("positions", {}).get(symbol, {}).get("value_usdt", 0)
new_exposure = current_exposure + amount_usdt
limit = total_balance * config.MAX_POSITION_PCT
if new_exposure > limit:
logger.info(
"Position limit: %s would be %.2f USDT (limit %.2f)",
symbol, new_exposure, limit,
)
return False
return True
def check_total_exposure(portfolio: dict, additional_usdt: float = 0) -> bool:
"""Reject if total exposure would exceed MAX_TOTAL_EXPOSURE_PCT."""
total_balance = portfolio.get("total_balance_usdt", 0)
if total_balance <= 0:
return False
current_exposure = sum(
pos.get("value_usdt", 0) for pos in portfolio.get("positions", {}).values()
)
new_total = current_exposure + additional_usdt
limit = total_balance * config.MAX_TOTAL_EXPOSURE_PCT
if new_total > limit:
logger.info(
"Total exposure would be %.2f USDT (limit %.2f)", new_total, limit,
)
return False
return True
def apply_stop_loss_take_profit(positions: dict, current_prices: dict) -> list[dict]:
"""Check positions for take-profit AND software stop-loss safety net.
Returns SELL signals for:
- Take-profit hits (pnl >= TAKE_PROFIT_PCT)
- Stop-loss safety net (pnl <= -STOP_LOSS_PCT) second line of defence
in case the exchange stop order was cancelled or failed.
"""
signals = []
for sym, pos in positions.items():
if pos.get("amount", 0) == 0:
continue
entry = pos.get("entry_price", 0)
if entry <= 0:
continue
current = current_prices.get(sym, 0)
if current <= 0:
continue
pnl_pct = (current - entry) / entry
if pnl_pct >= config.TAKE_PROFIT_PCT:
signals.append({
"symbol": sym,
"action": "SELL",
"reason": f"停利 ({pnl_pct:+.2%})",
"confidence": 1.0,
"suggested_amount_pct": 1.0,
})
logger.info("TAKE PROFIT %s: %.6g%.6g (%+.2f%%)", sym, entry, current, pnl_pct * 100)
elif pnl_pct <= -config.STOP_LOSS_PCT:
signals.append({
"symbol": sym,
"action": "SELL",
"reason": f"止損安全網 ({pnl_pct:+.2%})",
"confidence": 1.0,
"suggested_amount_pct": 1.0,
})
logger.warning(
"STOP-LOSS SAFETY NET %s: %.6g%.6g (%+.2f%%) — exchange stop order may have failed",
sym, entry, current, pnl_pct * 100,
)
return signals
def adjust_size_by_volatility(amount_usdt: float, atr: float, price: float) -> float:
"""Reduce position size when ATR is high relative to price."""
if price <= 0 or atr <= 0:
return amount_usdt
atr_pct = atr / price
# If ATR > 2%, scale down linearly (at 4% ATR → half size)
if atr_pct > 0.02:
factor = max(0.25, 0.02 / atr_pct)
adjusted = amount_usdt * factor
logger.info("Volatility adj: %.2f%.2f USDT (ATR %.2f%%)", amount_usdt, adjusted, atr_pct * 100)
return adjusted
return amount_usdt
def validate_trade(signal: dict, portfolio: dict, indicators_df=None) -> tuple[dict | None, str]:
"""Run all risk checks. Returns (signal, "") on success or (None, reason) on rejection."""
action = signal.get("action", "HOLD")
if action == "HOLD":
return None, "HOLD signal"
symbol = signal["symbol"]
total_balance = portfolio.get("total_balance_usdt", 0)
pct = signal.get("suggested_amount_pct", 0.1)
amount_usdt = total_balance * pct
if action == "BUY":
# Check minimum order
if amount_usdt < config.MIN_ORDER_USDT:
reason = f"Order too small: {amount_usdt:.2f} < {config.MIN_ORDER_USDT:.2f} USDT"
logger.info(reason)
return None, reason
# Volatility adjustment
if indicators_df is not None and not indicators_df.empty:
last = indicators_df.iloc[-1]
atr = last.get("atr", 0)
price = last.get("close", 0)
if atr and price:
amount_usdt = adjust_size_by_volatility(amount_usdt, atr, price)
# Available balance check — cap to available instead of rejecting
available = portfolio.get("available_usdt", 0)
if amount_usdt > available:
if available < config.MIN_ORDER_USDT:
return None, f"可用餘額不足 ({available:.2f} < {config.MIN_ORDER_USDT:.2f} USDT)"
logger.info("金額超出餘額,調整: %.2f%.2f USDT", amount_usdt, available)
amount_usdt = available
# Position limit — cap to remaining room instead of rejecting
if not check_position_limit(symbol, amount_usdt, portfolio):
limit = total_balance * config.MAX_POSITION_PCT
current_exposure = portfolio.get("positions", {}).get(symbol, {}).get("value_usdt", 0)
room = limit - current_exposure
if room < config.MIN_ORDER_USDT:
return None, f"持倉已達上限 ({current_exposure:.2f} / {limit:.2f} USDT)"
logger.info("持倉上限調整: %.2f%.2f USDT", amount_usdt, room)
amount_usdt = room
# Total exposure — cap to remaining room instead of rejecting
if not check_total_exposure(portfolio, amount_usdt):
limit = total_balance * config.MAX_TOTAL_EXPOSURE_PCT
current_total = sum(
pos.get("value_usdt", 0) for pos in portfolio.get("positions", {}).values()
)
room = limit - current_total
if room < config.MIN_ORDER_USDT:
return None, f"總曝險已達上限 ({current_total:.2f} / {limit:.2f} USDT)"
logger.info("總曝險上限調整: %.2f%.2f USDT", amount_usdt, room)
amount_usdt = room
elif action == "SELL":
# For sells, ensure we actually hold the position
pos = portfolio.get("positions", {}).get(symbol, {})
if pos.get("amount", 0) <= 0:
logger.info("No position to sell for %s", symbol)
return None, "無持倉,跳過"
# SELL: 一律全部賣出
pos_value = pos.get("value_usdt", 0)
if pos_value <= 0:
pos_value = pos.get("amount", 0) * pos.get("entry_price", 0)
amount_usdt = pos_value
signal = dict(signal)
signal["amount_usdt"] = amount_usdt
return signal, ""

147
slack_notifier.py Normal file
View File

@ -0,0 +1,147 @@
import logging
import time
import requests
import config
logger = logging.getLogger(__name__)
def _send(payload: dict):
"""Send a message to Slack via webhook."""
url = config.SLACK_WEBHOOK_URL
if not url:
logger.debug("Slack webhook not configured, skipping notification")
return
try:
resp = requests.post(url, json=payload, timeout=10)
if resp.status_code != 200:
logger.warning("Slack webhook returned %d: %s", resp.status_code, resp.text)
except Exception as e:
logger.error("Failed to send Slack notification: %s", e)
def send_trade_alert(trade: dict):
"""Send a trade execution notification."""
sym = trade.get("symbol", "?")
name = config.SYMBOL_NAMES.get(sym, sym)
action = trade.get("action", "?")
emoji = "🟢 BUY" if action == "BUY" else "🔴 SELL"
mode = trade.get("mode", "paper").upper()
text = (
f"{emoji} *{name}* [{mode}]\n"
f"Amount: {trade.get('amount', 0):.6f} (~{trade.get('amount_usdt', 0):.2f} USDT)\n"
f"Price: {trade.get('price', 0):.6g}\n"
f"Confidence: {trade.get('confidence', 0):.0%}\n"
f"Reason: {trade.get('reason', 'N/A')}\n"
f"Status: {trade.get('status', 'unknown')}"
)
_send({"text": text})
def send_status_update(summary: str):
"""Send periodic portfolio status update."""
_send({"text": summary})
def send_error_alert(error_msg: str):
"""Send an error alert."""
_send({"text": f"⚠️ *Trading Bot Error*\n```{error_msg}```"})
def send_cycle_report(
cycle_number: int,
signals: list[dict],
executed: list[dict],
rejected: list[dict],
tp_closed: list[dict],
portfolio_summary: str,
):
"""Send a unified cycle report combining analysis, execution, and portfolio status."""
lines = [f"📊 *Trading Cycle Report #{cycle_number}*\n"]
# --- Take-profit / auto actions ---
if tp_closed:
lines.append("⚡ *自動操作:*")
for tp in tp_closed:
name = config.SYMBOL_NAMES.get(tp["symbol"], tp["symbol"])
pnl = tp.get("reason", "")
amount_usdt = tp.get("amount_usdt", 0)
lines.append(f" 🔴 SELL {name}{pnl},平倉 {amount_usdt:.1f} USDT")
lines.append("")
# --- LLM signals ---
buys = [s for s in signals if s.get("action") == "BUY"]
sells = [s for s in signals if s.get("action") == "SELL"]
holds = [s for s in signals if s.get("action") == "HOLD"]
lines.append("🤖 *LLM 分析結果:*")
if buys:
for s in buys:
name = config.SYMBOL_NAMES.get(s["symbol"], s["symbol"])
lines.append(
f" 🟢 BUY {name} — conf {s.get('confidence', 0):.0%}, {s.get('reason', '')}"
)
if sells:
for s in sells:
name = config.SYMBOL_NAMES.get(s["symbol"], s["symbol"])
lines.append(
f" 🔴 SELL {name} — conf {s.get('confidence', 0):.0%}, {s.get('reason', '')}"
)
if holds:
lines.append(f" ⏸️ 其餘 {len(holds)} 幣種 HOLD")
if not buys and not sells:
lines.append(" All symbols → HOLD, no action this cycle.")
lines.append("")
# --- Executed trades ---
has_trades = bool(executed)
if executed:
lines.append("✅ *實際執行:*")
for t in executed:
name = config.SYMBOL_NAMES.get(t["symbol"], t["symbol"])
action = t.get("action", "?")
emoji = "🟢 BUY" if action == "BUY" else "🔴 SELL"
amount = t.get("amount", 0)
price = t.get("price", 0)
amount_usdt = t.get("amount_usdt", 0)
detail = f"{amount:.4f} @ {price:.6g} ({amount_usdt:.1f} USDT)"
stop_info = ""
if action == "BUY" and t.get("stop_price"):
stop_info = f" — 止損掛 {t['stop_price']:.6g}"
lines.append(f" {emoji} {name}{detail}{stop_info}")
lines.append("")
# --- Rejected signals ---
if rejected:
lines.append("❌ *未執行:*")
for r in rejected:
name = config.SYMBOL_NAMES.get(r["symbol"], r["symbol"])
action = r.get("action", "?")
emoji = "🟢 BUY" if action == "BUY" else "🔴 SELL"
lines.append(f" {emoji} {name}{r.get('reject_reason', '未知原因')}")
lines.append("")
# --- Portfolio summary ---
lines.append(f"💰 {portfolio_summary}")
text = "\n".join(lines)
# @channel when actual trades were executed
if has_trades:
text = f"<!channel>\n{text}"
_send({"text": text})
def send_startup_message():
"""Notify that the bot has started."""
mode = "PAPER" if config.PAPER_TRADING else "LIVE"
_send({
"text": (
f"🚀 *Trading Bot Started* [{mode} mode]\n"
f"Monitoring {len(config.TOP_15_SYMBOLS)} symbols\n"
f"Interval: {config.RUN_INTERVAL_MINUTES}min"
)
})

43
trade_logger.py Normal file
View File

@ -0,0 +1,43 @@
"""Persistent trade history logger — appends every trade to a CSV file."""
import csv
import os
from datetime import datetime
import config
TRADE_LOG_FILE = "trade_history.csv"
FIELDS = [
"timestamp", "datetime", "symbol", "name", "action", "amount",
"price", "amount_usdt", "confidence", "reason", "mode", "status",
]
def _ensure_header():
if not os.path.exists(TRADE_LOG_FILE):
with open(TRADE_LOG_FILE, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=FIELDS)
writer.writeheader()
def log_trade(trade: dict):
"""Append a trade record to the CSV log."""
_ensure_header()
sym = trade.get("symbol", "")
row = {
"timestamp": trade.get("timestamp", ""),
"datetime": datetime.now().isoformat(timespec="seconds"),
"symbol": sym,
"name": config.SYMBOL_NAMES.get(sym, sym),
"action": trade.get("action", ""),
"amount": trade.get("amount", 0),
"price": trade.get("price", 0),
"amount_usdt": trade.get("amount_usdt", 0),
"confidence": trade.get("confidence", 0),
"reason": trade.get("reason", ""),
"mode": trade.get("mode", ""),
"status": trade.get("status", ""),
}
with open(TRADE_LOG_FILE, "a", newline="") as f:
writer = csv.DictWriter(f, fieldnames=FIELDS)
writer.writerow(row)

156
trader.py Normal file
View File

@ -0,0 +1,156 @@
import logging
import time
import config
from data_fetcher import _auth_post
logger = logging.getLogger(__name__)
def execute_trade(signal: dict, current_prices: dict, mode: str | None = None) -> dict | None:
"""Execute a trade signal. Returns trade result dict or None on failure."""
mode = mode or ("paper" if config.PAPER_TRADING else "live")
symbol = signal["symbol"]
action = signal["action"]
amount_usdt = signal.get("amount_usdt", 0)
price = current_prices.get(symbol, 0)
if price <= 0:
logger.error("No price for %s, cannot execute trade", symbol)
return None
# Calculate amount in base currency
amount = amount_usdt / price
if action == "SELL":
amount = -abs(amount) # negative = sell on Bitfinex
trade_result = {
"symbol": symbol,
"action": action,
"amount": abs(amount),
"amount_usdt": amount_usdt,
"price": price,
"timestamp": time.time(),
"mode": mode,
"reason": signal.get("reason", ""),
"confidence": signal.get("confidence", 0),
}
if mode == "paper":
trade_result["status"] = "filled"
trade_result["order_id"] = f"paper_{int(time.time() * 1000)}"
logger.info(
"PAPER %s %s: %.6f @ %.6g (%.2f USDT)",
action, symbol, abs(amount), price, amount_usdt,
)
else:
try:
order = _submit_order(symbol, amount)
trade_result["status"] = "submitted"
trade_result["order_id"] = order.get("id", "unknown")
logger.info(
"LIVE %s %s: %.6f @ %.6g (%.2f USDT) — order %s",
action, symbol, abs(amount), price, amount_usdt, trade_result["order_id"],
)
except Exception as e:
logger.error("Order submission failed for %s: %s", symbol, e)
trade_result["status"] = "failed"
trade_result["error"] = str(e)
return trade_result
def place_stop_loss_order(symbol: str, amount: float, entry_price: float, mode: str | None = None) -> dict | None:
"""Place an EXCHANGE STOP order as stop-loss after a BUY.
Args:
symbol: Trading pair (e.g. tBTCUST)
amount: Position size in base currency (positive, will be negated for sell)
entry_price: The buy entry price
mode: "paper" or "live"
Returns:
dict with stop order info, or None on failure.
"""
mode = mode or ("paper" if config.PAPER_TRADING else "live")
stop_price = round(entry_price * (1 - config.STOP_LOSS_PCT), 8)
sell_amount = -abs(amount) # negative = sell
if mode == "paper":
order_id = f"paper_sl_{int(time.time() * 1000)}"
logger.info(
"PAPER STOP-LOSS placed: %s sell %.6f @ %.6g (entry %.6g, SL %.1f%%)",
symbol, abs(amount), stop_price, entry_price, config.STOP_LOSS_PCT * 100,
)
return {"order_id": order_id, "stop_price": stop_price, "symbol": symbol, "mode": "paper"}
# Live: submit EXCHANGE STOP order to Bitfinex
try:
body = {
"type": "EXCHANGE STOP",
"symbol": symbol,
"amount": str(sell_amount),
"price": str(stop_price),
}
result = _auth_post("/v2/auth/w/order/submit", body)
order_id = "unknown"
if isinstance(result, list) and len(result) > 4:
order_data = result[4]
if isinstance(order_data, list) and len(order_data) > 0:
order = order_data[0] if isinstance(order_data[0], list) else order_data
order_id = order[0]
logger.info(
"LIVE STOP-LOSS placed: %s sell %.6f @ %.6g — order %s",
symbol, abs(amount), stop_price, order_id,
)
return {"order_id": order_id, "stop_price": stop_price, "symbol": symbol, "mode": "live"}
except Exception as e:
logger.error("Failed to place stop-loss for %s: %s", symbol, e)
return None
def cancel_order(order_id, mode: str | None = None) -> bool:
"""Cancel an existing order by ID."""
mode = mode or ("paper" if config.PAPER_TRADING else "live")
if mode == "paper":
logger.info("PAPER cancel order: %s", order_id)
return True
try:
_auth_post("/v2/auth/w/order/cancel", {"id": int(order_id)})
logger.info("LIVE cancel order: %s", order_id)
return True
except Exception as e:
logger.error("Failed to cancel order %s: %s", order_id, e)
return False
def _submit_order(symbol: str, amount: float) -> dict:
"""Submit an EXCHANGE MARKET order to Bitfinex."""
body = {
"type": "EXCHANGE MARKET",
"symbol": symbol,
"amount": str(amount),
}
result = _auth_post("/v2/auth/w/order/submit", body)
if isinstance(result, list) and len(result) > 4:
order_data = result[4]
if isinstance(order_data, list) and len(order_data) > 0:
order = order_data[0] if isinstance(order_data[0], list) else order_data
return {"id": order[0], "raw": order}
return {"id": "unknown", "raw": result}
def get_wallet_balance() -> float:
"""Get USDT balance from exchange wallet."""
if config.PAPER_TRADING and not config.BFX_API_KEY:
return 0
try:
wallets = _auth_post("/v2/auth/r/wallets")
for w in wallets:
if w[0] == "exchange" and w[1] in ("UST", "USDT"):
return float(w[2])
except Exception as e:
logger.error("Failed to get wallet balance: %s", e)
return 0