#!/usr/bin/env python3 """LLM-driven cryptocurrency trading system — single-run mode for crontab.""" import json import logging import os import sys import time from datetime import datetime import config import data_fetcher import indicators import llm_analyzer import portfolio as pf import risk_manager import slack_notifier import trade_logger import trader # --------------------------------------------------------------------------- # Logging setup # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("trading.log"), ], ) logger = logging.getLogger("main") STATE_FILE = "bot_state.json" def _load_state() -> dict: if os.path.exists(STATE_FILE): try: with open(STATE_FILE) as f: return json.load(f) except (json.JSONDecodeError, IOError): pass return {"last_status_report": 0, "run_count": 0} def _save_state(state: dict): with open(STATE_FILE, "w") as f: json.dump(state, f, indent=2) def run_cycle(): """Execute one full trading cycle.""" state = _load_state() state["run_count"] = state.get("run_count", 0) + 1 logger.info("=" * 60) logger.info("Trading cycle #%d at %s", state["run_count"], datetime.now().isoformat()) # 1. Load portfolio state port = pf.load_positions() # 2. Fetch account status from exchange try: account_status = data_fetcher.fetch_account_status() port = pf.sync_with_exchange(port, account_status) logger.info( "Account: %.2f USDT total, %.2f USDT available", port.get("total_balance_usdt", 0), port.get("available_usdt", 0), ) except Exception as e: logger.error("Failed to fetch account status: %s", e) if config.PAPER_TRADING and port.get("total_balance_usdt", 0) == 0: port["total_balance_usdt"] = 10000 port["available_usdt"] = 10000 logger.info("Paper trading: using default 10000 USDT balance") # 2b. Sync stop-loss orders with exchange (real-time, not local memory) try: active_orders = data_fetcher.fetch_active_orders() except Exception as e: logger.error("Failed to fetch active orders: %s", e) active_orders = [] # Build map: symbol → list of EXCHANGE STOP orders stop_orders_by_sym = {} for o in active_orders: if o.get("type") == "EXCHANGE STOP": stop_orders_by_sym.setdefault(o["symbol"], []).append(o) # Build map: currency → wallet balance (for accurate stop-loss amounts) wallet_balances = {} for w in account_status.get("wallets", []): if w.get("type") == "exchange": wallet_balances[w["currency"]] = w.get("balance", 0) for sym, pos in port.get("positions", {}).items(): if pos.get("amount", 0) <= 0: continue entry = pos.get("entry_price", 0) if entry <= 0: continue # Use exchange wallet balance as the authoritative amount currency = sym[1:].replace(":UST", "").replace("UST", "") amount = wallet_balances.get(currency, 0) if amount <= 0: continue # Skip stop-loss for positions below exchange minimum order size min_amt = config.MIN_ORDER_AMOUNT.get(sym, 0) if min_amt > 0 and amount < min_amt: continue expected_stop = round(entry * (1 - config.STOP_LOSS_PCT), 8) existing_stops = stop_orders_by_sym.get(sym, []) if existing_stops: # Check if existing stop matches expected amount & price stop = existing_stops[0] stop_amt_ok = abs(abs(stop["amount"]) - amount) / amount < 0.01 # 1% tolerance stop_px_ok = abs(stop["price"] - expected_stop) / expected_stop < 0.01 if stop_amt_ok and stop_px_ok: continue # Stop order is correct # Wrong amount or price — cancel all existing stops and re-place for s in existing_stops: trader.cancel_order(s["id"]) logger.info("Cancelled outdated stop %s for %s (amt=%.6f, px=%.6g)", s["id"], sym, abs(s["amount"]), s["price"]) logger.warning("Position %s missing/outdated stop-loss, placing now", sym) sl = trader.place_stop_loss_order(sym, amount, entry) if sl: # Update stop_orders_by_sym so step 10 sees the correct stop ID stop_orders_by_sym[sym] = [{"id": sl["order_id"], "symbol": sym, "amount": -amount, "price": sl["stop_price"], "type": "EXCHANGE STOP"}] pf.save_positions(port) logger.info("Stop-loss for %s: %.6f @ %.6g", sym, amount, sl["stop_price"]) # 3. Fetch market data try: market_data = data_fetcher.fetch_all_market_data() logger.info("Fetched market data for %d symbols", len(market_data)) except Exception as e: logger.error("Failed to fetch market data: %s", e) slack_notifier.send_error_alert(f"Market data fetch failed: {e}") return # 4. Calculate indicators indicators_by_symbol = {} current_prices = {} for sym, md in market_data.items(): candles_df = md.get("candles") if candles_df is not None and not candles_df.empty: ind_df = indicators.calculate_indicators(candles_df) indicators_by_symbol[sym] = ind_df ticker = md.get("ticker", {}) if ticker: current_prices[sym] = ticker.get("last_price", 0) # 5. Cache data (persisted for next crontab run) try: data_fetcher.cache_market_data(market_data, indicators_by_symbol) except Exception as e: logger.warning("Cache write failed: %s", e) # --- Collect cycle results --- tp_closed = [] # take-profit closures executed = [] # successfully executed trades rejected = [] # signals rejected by risk manager # 6. Check take-profit on existing positions (stop-loss is handled by exchange stop orders) tp_signals = risk_manager.apply_stop_loss_take_profit( port.get("positions", {}), current_prices ) for signal in tp_signals: sym = signal["symbol"] pos = port.get("positions", {}).get(sym, {}) amount = pos.get("amount", 0) price = current_prices.get(sym, 0) if amount > 0 and price > 0: # Cancel exchange stop-loss orders before selling (from real-time data) for s in stop_orders_by_sym.get(sym, []): trader.cancel_order(s["id"]) logger.info("Cancelled stop %s for %s before TP sell", s["id"], sym) # Use wallet balance as authoritative sell amount currency = sym[1:].replace(":UST", "").replace("UST", "") wallet_amt = wallet_balances.get(currency, 0) if wallet_amt > 0: amount = wallet_amt signal["sell_amount"] = amount signal["amount_usdt"] = amount * price result = trader.execute_trade(signal, current_prices) if result and result.get("status") in ("filled", "submitted"): port = pf.update_position(port, sym, "SELL", amount, price, signal["amount_usdt"]) pf.save_positions(port) trade_logger.log_trade(result) tp_closed.append({**signal, "amount_usdt": amount * price}) logger.info("Take profit executed: %s", sym) # 7. Build indicator summary for LLM indicator_summary = indicators.summarize_all(indicators_by_symbol) # 8. Build account status string for LLM account_str = _build_account_string(port, current_prices) # 9. Call LLM for analysis signals = [] try: signals = llm_analyzer.analyze_market(indicator_summary, account_str) logger.info("LLM returned %d signals", len(signals)) except Exception as e: logger.error("LLM analysis failed: %s", e) slack_notifier.send_error_alert(f"LLM analysis failed: {e}") # 10. Validate and execute trades for signal in signals: action = signal.get("action", "HOLD") if action == "HOLD": continue ind_df = indicators_by_symbol.get(signal.get("symbol")) validated, reject_reason = risk_manager.validate_trade(signal, port, ind_df) if validated is None: logger.info("Signal rejected by risk manager: %s %s — %s", action, signal.get("symbol"), reject_reason) rejected.append({**signal, "reject_reason": reject_reason}) continue sym = validated["symbol"] if action == "SELL": # Cancel exchange stop-loss orders BEFORE selling (free up locked balance) for s in stop_orders_by_sym.get(sym, []): trader.cancel_order(s["id"]) logger.info("Cancelled stop %s for %s before sell", s["id"], sym) # Use wallet balance as authoritative sell amount currency = sym[1:].replace(":UST", "").replace("UST", "") wallet_amt = wallet_balances.get(currency, 0) if wallet_amt > 0: validated["sell_amount"] = wallet_amt result = trader.execute_trade(validated, current_prices) if result and result.get("status") == "failed": logger.warning("Order failed at exchange: %s %s — %s", action, validated["symbol"], result.get("error", "")) rejected.append({**validated, "reject_reason": f"交易所錯誤: {result.get('error', 'unknown')}"}) continue if result and result.get("status") in ("filled", "submitted"): amount = result.get("amount", 0) price = result.get("price", 0) amount_usdt = result.get("amount_usdt", 0) port = pf.update_position(port, sym, action, amount, price, amount_usdt) stop_price = None if action == "BUY": # Cancel existing exchange stop orders before placing new one pos = port.get("positions", {}).get(sym, {}) for s in stop_orders_by_sym.get(sym, []): trader.cancel_order(s["id"]) logger.info("Cancelled old stop %s for %s (position size changed)", s["id"], sym) # Place new stop-loss for TOTAL position amount at new avg entry total_amount = pos.get("amount", amount) entry_price = pos.get("entry_price", price) sl = trader.place_stop_loss_order(sym, total_amount, entry_price) if sl: stop_price = sl["stop_price"] # Update stop_orders_by_sym so subsequent actions in this cycle see it stop_orders_by_sym[sym] = [{"id": sl["order_id"], "symbol": sym, "amount": -total_amount, "price": stop_price, "type": "EXCHANGE STOP"}] logger.info("Stop-loss set for %s: %.6f @ %.6g", sym, total_amount, stop_price) pf.save_positions(port) trade_logger.log_trade(result) executed.append({**result, "stop_price": stop_price}) # 10b. Post-trade wallet refresh — ensure next trade uses latest balances if executed or tp_closed: try: refreshed = data_fetcher.fetch_account_status() port = pf.sync_with_exchange(port, refreshed) # Rebuild wallet_balances for any remaining logic wallet_balances = {} for w in refreshed.get("wallets", []): if w.get("type") == "exchange": wallet_balances[w["currency"]] = w.get("balance", 0) logger.info("Post-trade wallet refresh: %.2f USDT available", port.get("available_usdt", 0)) except Exception as e: logger.warning("Post-trade wallet refresh failed: %s", e) # 11. Send unified cycle report to Slack portfolio_summary = _build_portfolio_one_liner(port, current_prices) slack_notifier.send_cycle_report( cycle_number=state["run_count"], signals=signals, executed=executed, rejected=rejected, tp_closed=tp_closed, portfolio_summary=portfolio_summary, ) # 12. Save final state pf.save_positions(port) # 13. Hourly status report (check elapsed time across crontab runs) now = time.time() last_report = state.get("last_status_report", 0) if now - last_report >= config.STATUS_REPORT_INTERVAL_MINUTES * 60: summary = pf.get_portfolio_summary(port, current_prices) slack_notifier.send_status_update(summary) logger.info("Hourly status report sent") state["last_status_report"] = now state["last_run"] = now _save_state(state) logger.info("Cycle complete") def _build_portfolio_one_liner(port: dict, current_prices: dict) -> str: """Build a one-line portfolio summary for the cycle report.""" available = port.get("available_usdt", 0) positions = port.get("positions", {}) pos_count = 0 total_cost = 0.0 total_market_value = 0.0 for sym, pos in positions.items(): amount = pos.get("amount", 0) entry = pos.get("entry_price", 0) current = current_prices.get(sym, 0) min_amt = config.MIN_ORDER_AMOUNT.get(sym, 0) if amount > 0 and entry > 0 and amount >= min_amt: pos_count += 1 total_cost += amount * entry if current > 0: total_market_value += amount * current unrealized = total_market_value - total_cost total_value = available + total_market_value total_capital = available + total_cost total_return_pct = ((total_value / total_capital - 1) * 100) if total_capital > 0 else 0.0 return ( f"總值 {total_value:.2f} USDT | 總收益 {total_return_pct:+.2f}% | " f"{available:.2f} 可用 | 持倉 {pos_count} 筆" ) def _build_account_string(port: dict, current_prices: dict) -> str: """Build a concise account status string for the LLM prompt.""" lines = [] lines.append(f"Total Balance: {port.get('total_balance_usdt', 0):.2f} USDT") lines.append(f"Available: {port.get('available_usdt', 0):.2f} USDT") positions = port.get("positions", {}) if positions: lines.append("\nOpen Positions:") for sym, pos in positions.items(): amount = pos.get("amount", 0) entry = pos.get("entry_price", 0) current = current_prices.get(sym, 0) pnl_pct = ((current - entry) / entry * 100) if entry > 0 and current > 0 else 0 name = config.SYMBOL_NAMES.get(sym, sym) lines.append( f" {name}: {amount:.6f} @ {entry:.6g} " f"(now {current:.6g}, {pnl_pct:+.2f}%)" ) else: lines.append("\nNo open positions.") return "\n".join(lines) if __name__ == "__main__": mode = "PAPER" if config.PAPER_TRADING else "LIVE" logger.info("Running in %s mode, monitoring %d symbols", mode, len(config.TOP_15_SYMBOLS)) run_cycle()