- New sync_cost_basis.py: recalculate entry_price from order history, sync amounts/balances from wallet (cron every 30 min) - Fix stop-loss ID staleness: update stop_orders_by_sym in step 2b after placing new stops, preventing SELL failures from stale IDs - Fix position limit inconsistency: use same total_balance in validate_trade instead of calling check_position_limit - Skip stop-loss for positions below MIN_ORDER_AMOUNT - Add API response body logging for 500 errors - Cancel "Order not found" treated as success (not error) - Post-trade wallet refresh to ensure fresh balances - Report: show total value, total return %, exclude dust positions - Crontab offset +1 min from candle close for complete data - Handle PARTIALLY FILLED order status in cost calculation - Sort orders by mts_create in Python (Bitfinex sort unreliable) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
378 lines
15 KiB
Python
378 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""LLM-driven cryptocurrency trading system — single-run mode for crontab."""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
|
|
import config
|
|
import data_fetcher
|
|
import indicators
|
|
import llm_analyzer
|
|
import portfolio as pf
|
|
import risk_manager
|
|
import slack_notifier
|
|
import trade_logger
|
|
import trader
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Logging setup
|
|
# ---------------------------------------------------------------------------
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
logging.FileHandler("trading.log"),
|
|
],
|
|
)
|
|
logger = logging.getLogger("main")
|
|
|
|
STATE_FILE = "bot_state.json"
|
|
|
|
|
|
def _load_state() -> dict:
|
|
if os.path.exists(STATE_FILE):
|
|
try:
|
|
with open(STATE_FILE) as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, IOError):
|
|
pass
|
|
return {"last_status_report": 0, "run_count": 0}
|
|
|
|
|
|
def _save_state(state: dict):
|
|
with open(STATE_FILE, "w") as f:
|
|
json.dump(state, f, indent=2)
|
|
|
|
|
|
def run_cycle():
|
|
"""Execute one full trading cycle."""
|
|
state = _load_state()
|
|
state["run_count"] = state.get("run_count", 0) + 1
|
|
logger.info("=" * 60)
|
|
logger.info("Trading cycle #%d at %s", state["run_count"], datetime.now().isoformat())
|
|
|
|
# 1. Load portfolio state
|
|
port = pf.load_positions()
|
|
|
|
# 2. Fetch account status from exchange
|
|
try:
|
|
account_status = data_fetcher.fetch_account_status()
|
|
port = pf.sync_with_exchange(port, account_status)
|
|
logger.info(
|
|
"Account: %.2f USDT total, %.2f USDT available",
|
|
port.get("total_balance_usdt", 0),
|
|
port.get("available_usdt", 0),
|
|
)
|
|
except Exception as e:
|
|
logger.error("Failed to fetch account status: %s", e)
|
|
if config.PAPER_TRADING and port.get("total_balance_usdt", 0) == 0:
|
|
port["total_balance_usdt"] = 10000
|
|
port["available_usdt"] = 10000
|
|
logger.info("Paper trading: using default 10000 USDT balance")
|
|
|
|
# 2b. Sync stop-loss orders with exchange (real-time, not local memory)
|
|
try:
|
|
active_orders = data_fetcher.fetch_active_orders()
|
|
except Exception as e:
|
|
logger.error("Failed to fetch active orders: %s", e)
|
|
active_orders = []
|
|
# Build map: symbol → list of EXCHANGE STOP orders
|
|
stop_orders_by_sym = {}
|
|
for o in active_orders:
|
|
if o.get("type") == "EXCHANGE STOP":
|
|
stop_orders_by_sym.setdefault(o["symbol"], []).append(o)
|
|
|
|
# Build map: currency → wallet balance (for accurate stop-loss amounts)
|
|
wallet_balances = {}
|
|
for w in account_status.get("wallets", []):
|
|
if w.get("type") == "exchange":
|
|
wallet_balances[w["currency"]] = w.get("balance", 0)
|
|
|
|
for sym, pos in port.get("positions", {}).items():
|
|
if pos.get("amount", 0) <= 0:
|
|
continue
|
|
entry = pos.get("entry_price", 0)
|
|
if entry <= 0:
|
|
continue
|
|
|
|
# Use exchange wallet balance as the authoritative amount
|
|
currency = sym[1:].replace(":UST", "").replace("UST", "")
|
|
amount = wallet_balances.get(currency, 0)
|
|
if amount <= 0:
|
|
continue
|
|
|
|
# Skip stop-loss for positions below exchange minimum order size
|
|
min_amt = config.MIN_ORDER_AMOUNT.get(sym, 0)
|
|
if min_amt > 0 and amount < min_amt:
|
|
continue
|
|
|
|
expected_stop = round(entry * (1 - config.STOP_LOSS_PCT), 8)
|
|
existing_stops = stop_orders_by_sym.get(sym, [])
|
|
|
|
if existing_stops:
|
|
# Check if existing stop matches expected amount & price
|
|
stop = existing_stops[0]
|
|
stop_amt_ok = abs(abs(stop["amount"]) - amount) / amount < 0.01 # 1% tolerance
|
|
stop_px_ok = abs(stop["price"] - expected_stop) / expected_stop < 0.01
|
|
if stop_amt_ok and stop_px_ok:
|
|
continue # Stop order is correct
|
|
# Wrong amount or price — cancel all existing stops and re-place
|
|
for s in existing_stops:
|
|
trader.cancel_order(s["id"])
|
|
logger.info("Cancelled outdated stop %s for %s (amt=%.6f, px=%.6g)",
|
|
s["id"], sym, abs(s["amount"]), s["price"])
|
|
|
|
logger.warning("Position %s missing/outdated stop-loss, placing now", sym)
|
|
sl = trader.place_stop_loss_order(sym, amount, entry)
|
|
if sl:
|
|
# Update stop_orders_by_sym so step 10 sees the correct stop ID
|
|
stop_orders_by_sym[sym] = [{"id": sl["order_id"], "symbol": sym,
|
|
"amount": -amount, "price": sl["stop_price"],
|
|
"type": "EXCHANGE STOP"}]
|
|
pf.save_positions(port)
|
|
logger.info("Stop-loss for %s: %.6f @ %.6g", sym, amount, sl["stop_price"])
|
|
|
|
# 3. Fetch market data
|
|
try:
|
|
market_data = data_fetcher.fetch_all_market_data()
|
|
logger.info("Fetched market data for %d symbols", len(market_data))
|
|
except Exception as e:
|
|
logger.error("Failed to fetch market data: %s", e)
|
|
slack_notifier.send_error_alert(f"Market data fetch failed: {e}")
|
|
return
|
|
|
|
# 4. Calculate indicators
|
|
indicators_by_symbol = {}
|
|
current_prices = {}
|
|
for sym, md in market_data.items():
|
|
candles_df = md.get("candles")
|
|
if candles_df is not None and not candles_df.empty:
|
|
ind_df = indicators.calculate_indicators(candles_df)
|
|
indicators_by_symbol[sym] = ind_df
|
|
ticker = md.get("ticker", {})
|
|
if ticker:
|
|
current_prices[sym] = ticker.get("last_price", 0)
|
|
|
|
# 5. Cache data (persisted for next crontab run)
|
|
try:
|
|
data_fetcher.cache_market_data(market_data, indicators_by_symbol)
|
|
except Exception as e:
|
|
logger.warning("Cache write failed: %s", e)
|
|
|
|
# --- Collect cycle results ---
|
|
tp_closed = [] # take-profit closures
|
|
executed = [] # successfully executed trades
|
|
rejected = [] # signals rejected by risk manager
|
|
|
|
# 6. Check take-profit on existing positions (stop-loss is handled by exchange stop orders)
|
|
tp_signals = risk_manager.apply_stop_loss_take_profit(
|
|
port.get("positions", {}), current_prices
|
|
)
|
|
for signal in tp_signals:
|
|
sym = signal["symbol"]
|
|
pos = port.get("positions", {}).get(sym, {})
|
|
amount = pos.get("amount", 0)
|
|
price = current_prices.get(sym, 0)
|
|
if amount > 0 and price > 0:
|
|
# Cancel exchange stop-loss orders before selling (from real-time data)
|
|
for s in stop_orders_by_sym.get(sym, []):
|
|
trader.cancel_order(s["id"])
|
|
logger.info("Cancelled stop %s for %s before TP sell", s["id"], sym)
|
|
|
|
# Use wallet balance as authoritative sell amount
|
|
currency = sym[1:].replace(":UST", "").replace("UST", "")
|
|
wallet_amt = wallet_balances.get(currency, 0)
|
|
if wallet_amt > 0:
|
|
amount = wallet_amt
|
|
signal["sell_amount"] = amount
|
|
signal["amount_usdt"] = amount * price
|
|
result = trader.execute_trade(signal, current_prices)
|
|
if result and result.get("status") in ("filled", "submitted"):
|
|
port = pf.update_position(port, sym, "SELL", amount, price, signal["amount_usdt"])
|
|
pf.save_positions(port)
|
|
trade_logger.log_trade(result)
|
|
tp_closed.append({**signal, "amount_usdt": amount * price})
|
|
logger.info("Take profit executed: %s", sym)
|
|
|
|
# 7. Build indicator summary for LLM
|
|
indicator_summary = indicators.summarize_all(indicators_by_symbol)
|
|
|
|
# 8. Build account status string for LLM
|
|
account_str = _build_account_string(port, current_prices)
|
|
|
|
# 9. Call LLM for analysis
|
|
signals = []
|
|
try:
|
|
signals = llm_analyzer.analyze_market(indicator_summary, account_str)
|
|
logger.info("LLM returned %d signals", len(signals))
|
|
except Exception as e:
|
|
logger.error("LLM analysis failed: %s", e)
|
|
slack_notifier.send_error_alert(f"LLM analysis failed: {e}")
|
|
|
|
# 10. Validate and execute trades
|
|
for signal in signals:
|
|
action = signal.get("action", "HOLD")
|
|
if action == "HOLD":
|
|
continue
|
|
|
|
ind_df = indicators_by_symbol.get(signal.get("symbol"))
|
|
validated, reject_reason = risk_manager.validate_trade(signal, port, ind_df)
|
|
if validated is None:
|
|
logger.info("Signal rejected by risk manager: %s %s — %s", action, signal.get("symbol"), reject_reason)
|
|
rejected.append({**signal, "reject_reason": reject_reason})
|
|
continue
|
|
|
|
sym = validated["symbol"]
|
|
|
|
if action == "SELL":
|
|
# Cancel exchange stop-loss orders BEFORE selling (free up locked balance)
|
|
for s in stop_orders_by_sym.get(sym, []):
|
|
trader.cancel_order(s["id"])
|
|
logger.info("Cancelled stop %s for %s before sell", s["id"], sym)
|
|
# Use wallet balance as authoritative sell amount
|
|
currency = sym[1:].replace(":UST", "").replace("UST", "")
|
|
wallet_amt = wallet_balances.get(currency, 0)
|
|
if wallet_amt > 0:
|
|
validated["sell_amount"] = wallet_amt
|
|
|
|
result = trader.execute_trade(validated, current_prices)
|
|
if result and result.get("status") == "failed":
|
|
logger.warning("Order failed at exchange: %s %s — %s", action, validated["symbol"], result.get("error", ""))
|
|
rejected.append({**validated, "reject_reason": f"交易所錯誤: {result.get('error', 'unknown')}"})
|
|
continue
|
|
|
|
if result and result.get("status") in ("filled", "submitted"):
|
|
amount = result.get("amount", 0)
|
|
price = result.get("price", 0)
|
|
amount_usdt = result.get("amount_usdt", 0)
|
|
|
|
port = pf.update_position(port, sym, action, amount, price, amount_usdt)
|
|
|
|
stop_price = None
|
|
if action == "BUY":
|
|
# Cancel existing exchange stop orders before placing new one
|
|
pos = port.get("positions", {}).get(sym, {})
|
|
for s in stop_orders_by_sym.get(sym, []):
|
|
trader.cancel_order(s["id"])
|
|
logger.info("Cancelled old stop %s for %s (position size changed)", s["id"], sym)
|
|
|
|
# Place new stop-loss for TOTAL position amount at new avg entry
|
|
total_amount = pos.get("amount", amount)
|
|
entry_price = pos.get("entry_price", price)
|
|
sl = trader.place_stop_loss_order(sym, total_amount, entry_price)
|
|
if sl:
|
|
stop_price = sl["stop_price"]
|
|
# Update stop_orders_by_sym so subsequent actions in this cycle see it
|
|
stop_orders_by_sym[sym] = [{"id": sl["order_id"], "symbol": sym,
|
|
"amount": -total_amount, "price": stop_price,
|
|
"type": "EXCHANGE STOP"}]
|
|
logger.info("Stop-loss set for %s: %.6f @ %.6g", sym, total_amount, stop_price)
|
|
|
|
pf.save_positions(port)
|
|
trade_logger.log_trade(result)
|
|
executed.append({**result, "stop_price": stop_price})
|
|
|
|
# 10b. Post-trade wallet refresh — ensure next trade uses latest balances
|
|
if executed or tp_closed:
|
|
try:
|
|
refreshed = data_fetcher.fetch_account_status()
|
|
port = pf.sync_with_exchange(port, refreshed)
|
|
# Rebuild wallet_balances for any remaining logic
|
|
wallet_balances = {}
|
|
for w in refreshed.get("wallets", []):
|
|
if w.get("type") == "exchange":
|
|
wallet_balances[w["currency"]] = w.get("balance", 0)
|
|
logger.info("Post-trade wallet refresh: %.2f USDT available", port.get("available_usdt", 0))
|
|
except Exception as e:
|
|
logger.warning("Post-trade wallet refresh failed: %s", e)
|
|
|
|
# 11. Send unified cycle report to Slack
|
|
portfolio_summary = _build_portfolio_one_liner(port, current_prices)
|
|
slack_notifier.send_cycle_report(
|
|
cycle_number=state["run_count"],
|
|
signals=signals,
|
|
executed=executed,
|
|
rejected=rejected,
|
|
tp_closed=tp_closed,
|
|
portfolio_summary=portfolio_summary,
|
|
)
|
|
|
|
# 12. Save final state
|
|
pf.save_positions(port)
|
|
|
|
# 13. Hourly status report (check elapsed time across crontab runs)
|
|
now = time.time()
|
|
last_report = state.get("last_status_report", 0)
|
|
if now - last_report >= config.STATUS_REPORT_INTERVAL_MINUTES * 60:
|
|
summary = pf.get_portfolio_summary(port, current_prices)
|
|
slack_notifier.send_status_update(summary)
|
|
logger.info("Hourly status report sent")
|
|
state["last_status_report"] = now
|
|
|
|
state["last_run"] = now
|
|
_save_state(state)
|
|
logger.info("Cycle complete")
|
|
|
|
|
|
def _build_portfolio_one_liner(port: dict, current_prices: dict) -> str:
|
|
"""Build a one-line portfolio summary for the cycle report."""
|
|
available = port.get("available_usdt", 0)
|
|
positions = port.get("positions", {})
|
|
pos_count = 0
|
|
total_cost = 0.0
|
|
total_market_value = 0.0
|
|
for sym, pos in positions.items():
|
|
amount = pos.get("amount", 0)
|
|
entry = pos.get("entry_price", 0)
|
|
current = current_prices.get(sym, 0)
|
|
min_amt = config.MIN_ORDER_AMOUNT.get(sym, 0)
|
|
if amount > 0 and entry > 0 and amount >= min_amt:
|
|
pos_count += 1
|
|
total_cost += amount * entry
|
|
if current > 0:
|
|
total_market_value += amount * current
|
|
unrealized = total_market_value - total_cost
|
|
total_value = available + total_market_value
|
|
total_capital = available + total_cost
|
|
total_return_pct = ((total_value / total_capital - 1) * 100) if total_capital > 0 else 0.0
|
|
return (
|
|
f"總值 {total_value:.2f} USDT | 總收益 {total_return_pct:+.2f}% | "
|
|
f"{available:.2f} 可用 | 持倉 {pos_count} 筆"
|
|
)
|
|
|
|
|
|
def _build_account_string(port: dict, current_prices: dict) -> str:
|
|
"""Build a concise account status string for the LLM prompt."""
|
|
lines = []
|
|
lines.append(f"Total Balance: {port.get('total_balance_usdt', 0):.2f} USDT")
|
|
lines.append(f"Available: {port.get('available_usdt', 0):.2f} USDT")
|
|
|
|
positions = port.get("positions", {})
|
|
if positions:
|
|
lines.append("\nOpen Positions:")
|
|
for sym, pos in positions.items():
|
|
amount = pos.get("amount", 0)
|
|
entry = pos.get("entry_price", 0)
|
|
current = current_prices.get(sym, 0)
|
|
pnl_pct = ((current - entry) / entry * 100) if entry > 0 and current > 0 else 0
|
|
name = config.SYMBOL_NAMES.get(sym, sym)
|
|
lines.append(
|
|
f" {name}: {amount:.6f} @ {entry:.6g} "
|
|
f"(now {current:.6g}, {pnl_pct:+.2f}%)"
|
|
)
|
|
else:
|
|
lines.append("\nNo open positions.")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
mode = "PAPER" if config.PAPER_TRADING else "LIVE"
|
|
logger.info("Running in %s mode, monitoring %d symbols", mode, len(config.TOP_15_SYMBOLS))
|
|
run_cycle()
|