bifitnex-trading/sync_cost_basis.py
kroutony b6bd45b151 Fix SELL P&L calculation, add debug logging, and multiple improvements
- Fix realized_pnl always being 0 on SELL (use amount*price instead of amount_usdt)
- Add debug logging for all-HOLD LLM cycles (log sample HOLD reasons)
- Fix nonce collision in Bitfinex API auth (add counter)
- Fix timezone to UTC+8 in main, trade_logger, sync_cost_basis, check_errors
- Fix stop-loss retry with longer delay after cancel
- Add min order amount check in trader before BUY
- Fix risk_manager to skip positions with amount <= 0
- Cap trade_history to 500 entries to prevent unbounded growth
- Fix greedy regex in LLM response parser
- Reset cost_tracking start dates to null

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-17 06:23:54 +00:00

345 lines
12 KiB
Python

#!/usr/bin/env python3
"""Sync cost basis (entry_price), wallet balances, and per-coin amounts from order history.
Designed to run every 30 minutes via cron.
"""
import json
import logging
import os
import sys
import time
from datetime import datetime, timezone, timedelta
import config
import data_fetcher
import slack_notifier
# ---------------------------------------------------------------------------
# Logging setup (separate log file)
# ---------------------------------------------------------------------------
_TZ_UTC8 = timezone(timedelta(hours=8))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("sync_cost_basis.log"),
],
)
logging.Formatter.converter = lambda *args: datetime.now(_TZ_UTC8).timetuple()
logger = logging.getLogger("sync_cost_basis")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _symbol_to_currency(symbol: str) -> str:
"""Convert symbol like 'tETHUST' or 'tAVAX:UST' to currency code like 'ETH'."""
return symbol[1:].replace(":UST", "").replace("UST", "")
def _build_currency_to_symbol() -> dict[str, str]:
"""Build mapping: currency code → symbol from TOP_15_SYMBOLS."""
return {_symbol_to_currency(sym): sym for sym in config.TOP_15_SYMBOLS}
# ---------------------------------------------------------------------------
# Cost tracking persistence
# ---------------------------------------------------------------------------
def load_cost_tracking() -> dict:
if os.path.exists(config.COST_TRACKING_FILE):
try:
with open(config.COST_TRACKING_FILE) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
pass
return {}
def save_cost_tracking(data: dict):
with open(config.COST_TRACKING_FILE, "w") as f:
json.dump(data, f, indent=2)
def init_cost_tracking(positions: dict) -> dict:
"""Initialize cost_tracking for current positions with default start_ms."""
tracking = {}
for sym in positions:
tracking[sym] = config.INIT_COST_BASIS_START_MS
return tracking
# ---------------------------------------------------------------------------
# Fetch order history with pagination
# ---------------------------------------------------------------------------
def fetch_orders_hist(symbol: str, start_ms: int) -> list[list]:
"""Fetch all filled orders for symbol from start_ms to now, with pagination."""
all_orders = []
end_ms = int(time.time() * 1000)
while True:
body = {"start": start_ms, "end": end_ms, "limit": 500}
path = f"/v2/auth/r/orders/{symbol}/hist"
try:
raw = data_fetcher._auth_post(path, body)
except Exception as e:
logger.error("Failed to fetch orders for %s: %s", symbol, e)
break
if not raw or not isinstance(raw, list):
break
all_orders.extend(raw)
if len(raw) < 500:
break
# Paginate: default is descending, so oldest is last
last_mts = min(o[4] for o in raw) # earliest mts_create
end_ms = last_mts - 1
# Sort by mts_create ascending (Bitfinex default order is unreliable)
all_orders.sort(key=lambda o: o[4])
return all_orders
# ---------------------------------------------------------------------------
# Cost basis calculation
# ---------------------------------------------------------------------------
def calculate_cost_basis(orders: list[list]) -> tuple[float, float]:
"""Calculate cost basis from sorted orders.
Returns (amount, entry_price).
Order fields: [id, gid, cid, symbol, mts_create, mts_update, amount, amount_orig,
type, type_prev, mts_tif, _, flags, status, _, _, price, price_avg,
price_trailing, ...]
- amount (index 6): remaining amount (0 if fully filled)
- amount_orig (index 7): original order amount (positive=buy, negative=sell)
- price_avg (index 17): average execution price
"""
amount = 0.0
total_cost = 0.0
for order in orders:
status = order[13] if len(order) > 13 else ""
if not isinstance(status, str):
continue
# Include EXECUTED and PARTIALLY FILLED orders; skip CANCELED with no fill
has_fill = "EXECUTED" in status or "PARTIALLY FILLED" in status
if not has_fill:
continue
amount_orig = float(order[7]) # positive = buy, negative = sell
amount_remaining = float(order[6]) # 0 if fully filled
price_avg = float(order[17]) if len(order) > 17 and order[17] else 0
if price_avg <= 0:
continue
# Actual executed amount = amount_orig - amount_remaining
exec_amt = abs(amount_orig - amount_remaining)
if exec_amt <= 0:
continue
if amount_orig > 0: # BUY
total_cost += exec_amt * price_avg
amount += exec_amt
else: # SELL
if amount > 0:
ratio = min(exec_amt / amount, 1.0)
total_cost *= (1 - ratio)
amount -= exec_amt
if amount < 0:
amount = 0.0
total_cost = 0.0
entry_price = total_cost / amount if amount > 0 else 0.0
return amount, entry_price
# ---------------------------------------------------------------------------
# Main sync logic
# ---------------------------------------------------------------------------
def run_sync():
logger.info("=" * 60)
logger.info("Starting cost basis sync")
# 1. Fetch wallet balances
try:
account_status = data_fetcher.fetch_account_status()
except Exception as e:
logger.error("Failed to fetch account status: %s", e)
return
wallet_balances: dict[str, float] = {}
usdt_balance = 0.0
usdt_available = 0.0
for w in account_status.get("wallets", []):
if w.get("type") == "exchange":
currency = w["currency"]
bal = w.get("balance", 0)
if currency in ("UST", "USDT"):
usdt_balance = bal
usdt_available = w.get("available", 0) or bal
else:
if bal > 0:
wallet_balances[currency] = bal
currency_to_symbol = _build_currency_to_symbol()
# 2. Load cost tracking and positions
cost_tracking = load_cost_tracking()
positions_data = {}
if os.path.exists(config.POSITIONS_FILE):
try:
with open(config.POSITIONS_FILE) as f:
positions_data = json.load(f)
except (json.JSONDecodeError, IOError):
pass
positions = positions_data.get("positions", {})
# Initialize cost_tracking if empty
if not cost_tracking:
cost_tracking = init_cost_tracking(positions)
save_cost_tracking(cost_tracking)
logger.info("Initialized cost_tracking.json with %d symbols", len(cost_tracking))
# 3. Determine which symbols to process
# Symbols from: current positions + cost_tracking + wallet balances
all_symbols = set()
all_symbols.update(positions.keys())
all_symbols.update(k for k, v in cost_tracking.items() if v is not None)
for currency, bal in wallet_balances.items():
sym = currency_to_symbol.get(currency)
if sym:
all_symbols.add(sym)
# 4. Process each symbol
for sym in sorted(all_symbols):
currency = _symbol_to_currency(sym)
wallet_amt = wallet_balances.get(currency, 0)
start_ms = cost_tracking.get(sym)
# If no start_ms and no wallet balance, skip
if start_ms is None and wallet_amt <= 0:
continue
# If wallet has balance but no tracking entry, initialize
if start_ms is None and wallet_amt > 0:
start_ms = config.INIT_COST_BASIS_START_MS
cost_tracking[sym] = start_ms
logger.info("Processing %s (wallet: %.6f, start_ms: %s)", sym, wallet_amt, start_ms)
# Fetch order history
orders = fetch_orders_hist(sym, start_ms)
logger.info(" Fetched %d orders for %s", len(orders), sym)
if not orders:
# No orders found — keep existing position data but update amount from wallet
if wallet_amt > 0:
existing = positions.get(sym, {})
existing_entry = existing.get("entry_price", 0)
if existing_entry > 0:
positions[sym] = {
"amount": wallet_amt,
"entry_price": existing_entry,
"value_usdt": wallet_amt * existing_entry,
"last_update": time.time(),
}
elif sym in positions:
# No wallet balance and no orders — remove position
positions.pop(sym, None)
cost_tracking[sym] = None
time.sleep(0.5)
continue
# Calculate cost basis
calc_amount, entry_price = calculate_cost_basis(orders)
# Deviation check: calculated amount vs wallet balance
if wallet_amt > 0 and calc_amount > 0:
deviation = abs(calc_amount - wallet_amt) / wallet_amt
logger.info(
" %s: calc_amount=%.6f, wallet=%.6f, entry=%.6g, deviation=%.2f%%",
sym, calc_amount, wallet_amt, entry_price, deviation * 100,
)
if deviation > 0.05:
msg = (
f"<!channel>\n"
f"⚠️ *成本基礎偏差警告*\n"
f"幣種: {config.SYMBOL_NAMES.get(sym, sym)}\n"
f"計算量: {calc_amount:.6f}\n"
f"錢包量: {wallet_amt:.6f}\n"
f"偏差: {deviation:.2%}\n"
f"建議檢查 cost_tracking 起始日期"
)
slack_notifier._send({"text": msg})
logger.warning(" Deviation > 5%% for %s, Slack notified", sym)
else:
logger.info(
" %s: calc_amount=%.6f, wallet=%.6f, entry=%.6g",
sym, calc_amount, wallet_amt, entry_price,
)
# Update positions: use wallet amount as authoritative, calculated entry_price
if wallet_amt > 0 and entry_price > 0:
positions[sym] = {
"amount": wallet_amt,
"entry_price": entry_price,
"value_usdt": wallet_amt * entry_price,
"last_update": time.time(),
}
elif wallet_amt <= 0:
# Fully sold — remove position and mark tracking as null
positions.pop(sym, None)
cost_tracking[sym] = None
logger.info(" %s fully sold, marking cost_tracking as null", sym)
# Update cost_tracking: find first BUY mts_create
if wallet_amt > 0:
first_buy_mts = None
for order in orders:
status = order[13] if len(order) > 13 else ""
if isinstance(status, str) and "EXECUTED" in status and float(order[7]) > 0:
first_buy_mts = order[4] # mts_create
break
if first_buy_mts is not None:
cost_tracking[sym] = first_buy_mts
time.sleep(1) # Rate limit between symbols (avoid nonce collisions)
# 5. Remove positions for currencies no longer in wallet
for sym in list(positions.keys()):
currency = _symbol_to_currency(sym)
if wallet_balances.get(currency, 0) <= 0:
positions.pop(sym, None)
if sym in cost_tracking:
cost_tracking[sym] = None
# 6. Save updated positions (update balances too)
positions_data["positions"] = positions
positions_data["total_balance_usdt"] = usdt_balance
positions_data["available_usdt"] = usdt_available
positions_data["last_updated"] = time.time()
with open(config.POSITIONS_FILE, "w") as f:
json.dump(positions_data, f, indent=2)
# 7. Save cost tracking
save_cost_tracking(cost_tracking)
logger.info("Cost basis sync complete. Updated %d positions.", len(positions))
if __name__ == "__main__":
run_sync()