#!/usr/bin/env python3 """Sync cost basis (entry_price), wallet balances, and per-coin amounts from order history. Designed to run every 30 minutes via cron. """ import json import logging import os import sys import time import config import data_fetcher import slack_notifier # --------------------------------------------------------------------------- # Logging setup (separate log file) # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("sync_cost_basis.log"), ], ) logger = logging.getLogger("sync_cost_basis") # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _symbol_to_currency(symbol: str) -> str: """Convert symbol like 'tETHUST' or 'tAVAX:UST' to currency code like 'ETH'.""" return symbol[1:].replace(":UST", "").replace("UST", "") def _build_currency_to_symbol() -> dict[str, str]: """Build mapping: currency code → symbol from TOP_15_SYMBOLS.""" return {_symbol_to_currency(sym): sym for sym in config.TOP_15_SYMBOLS} # --------------------------------------------------------------------------- # Cost tracking persistence # --------------------------------------------------------------------------- def load_cost_tracking() -> dict: if os.path.exists(config.COST_TRACKING_FILE): try: with open(config.COST_TRACKING_FILE) as f: return json.load(f) except (json.JSONDecodeError, IOError): pass return {} def save_cost_tracking(data: dict): with open(config.COST_TRACKING_FILE, "w") as f: json.dump(data, f, indent=2) def init_cost_tracking(positions: dict) -> dict: """Initialize cost_tracking for current positions with default start_ms.""" tracking = {} for sym in positions: tracking[sym] = config.INIT_COST_BASIS_START_MS return tracking # --------------------------------------------------------------------------- # Fetch order history with pagination # --------------------------------------------------------------------------- def fetch_orders_hist(symbol: str, start_ms: int) -> list[list]: """Fetch all filled orders for symbol from start_ms to now, with pagination.""" all_orders = [] end_ms = int(time.time() * 1000) while True: body = {"start": start_ms, "end": end_ms, "limit": 500} path = f"/v2/auth/r/orders/{symbol}/hist" try: raw = data_fetcher._auth_post(path, body) except Exception as e: logger.error("Failed to fetch orders for %s: %s", symbol, e) break if not raw or not isinstance(raw, list): break all_orders.extend(raw) if len(raw) < 500: break # Paginate: default is descending, so oldest is last last_mts = min(o[4] for o in raw) # earliest mts_create end_ms = last_mts - 1 # Sort by mts_create ascending (Bitfinex default order is unreliable) all_orders.sort(key=lambda o: o[4]) return all_orders # --------------------------------------------------------------------------- # Cost basis calculation # --------------------------------------------------------------------------- def calculate_cost_basis(orders: list[list]) -> tuple[float, float]: """Calculate cost basis from sorted orders. Returns (amount, entry_price). Order fields: [id, gid, cid, symbol, mts_create, mts_update, amount, amount_orig, type, type_prev, mts_tif, _, flags, status, _, _, price, price_avg, price_trailing, ...] - amount (index 6): remaining amount (0 if fully filled) - amount_orig (index 7): original order amount (positive=buy, negative=sell) - price_avg (index 17): average execution price """ amount = 0.0 total_cost = 0.0 for order in orders: status = order[13] if len(order) > 13 else "" if not isinstance(status, str): continue # Include EXECUTED and PARTIALLY FILLED orders; skip CANCELED with no fill has_fill = "EXECUTED" in status or "PARTIALLY FILLED" in status if not has_fill: continue amount_orig = float(order[7]) # positive = buy, negative = sell amount_remaining = float(order[6]) # 0 if fully filled price_avg = float(order[17]) if len(order) > 17 and order[17] else 0 if price_avg <= 0: continue # Actual executed amount = amount_orig - amount_remaining exec_amt = abs(amount_orig - amount_remaining) if exec_amt <= 0: continue if amount_orig > 0: # BUY total_cost += exec_amt * price_avg amount += exec_amt else: # SELL if amount > 0: ratio = min(exec_amt / amount, 1.0) total_cost *= (1 - ratio) amount -= exec_amt if amount < 0: amount = 0.0 total_cost = 0.0 entry_price = total_cost / amount if amount > 0 else 0.0 return amount, entry_price # --------------------------------------------------------------------------- # Main sync logic # --------------------------------------------------------------------------- def run_sync(): logger.info("=" * 60) logger.info("Starting cost basis sync") # 1. Fetch wallet balances try: account_status = data_fetcher.fetch_account_status() except Exception as e: logger.error("Failed to fetch account status: %s", e) return wallet_balances: dict[str, float] = {} usdt_balance = 0.0 usdt_available = 0.0 for w in account_status.get("wallets", []): if w.get("type") == "exchange": currency = w["currency"] bal = w.get("balance", 0) if currency in ("UST", "USDT"): usdt_balance = bal usdt_available = w.get("available", 0) or bal else: if bal > 0: wallet_balances[currency] = bal currency_to_symbol = _build_currency_to_symbol() # 2. Load cost tracking and positions cost_tracking = load_cost_tracking() positions_data = {} if os.path.exists(config.POSITIONS_FILE): try: with open(config.POSITIONS_FILE) as f: positions_data = json.load(f) except (json.JSONDecodeError, IOError): pass positions = positions_data.get("positions", {}) # Initialize cost_tracking if empty if not cost_tracking: cost_tracking = init_cost_tracking(positions) save_cost_tracking(cost_tracking) logger.info("Initialized cost_tracking.json with %d symbols", len(cost_tracking)) # 3. Determine which symbols to process # Symbols from: current positions + cost_tracking + wallet balances all_symbols = set() all_symbols.update(positions.keys()) all_symbols.update(k for k, v in cost_tracking.items() if v is not None) for currency, bal in wallet_balances.items(): sym = currency_to_symbol.get(currency) if sym: all_symbols.add(sym) # 4. Process each symbol for sym in sorted(all_symbols): currency = _symbol_to_currency(sym) wallet_amt = wallet_balances.get(currency, 0) start_ms = cost_tracking.get(sym) # If no start_ms and no wallet balance, skip if start_ms is None and wallet_amt <= 0: continue # If wallet has balance but no tracking entry, initialize if start_ms is None and wallet_amt > 0: start_ms = config.INIT_COST_BASIS_START_MS cost_tracking[sym] = start_ms logger.info("Processing %s (wallet: %.6f, start_ms: %s)", sym, wallet_amt, start_ms) # Fetch order history orders = fetch_orders_hist(sym, start_ms) logger.info(" Fetched %d orders for %s", len(orders), sym) if not orders: # No orders found — keep existing position data but update amount from wallet if wallet_amt > 0: existing = positions.get(sym, {}) existing_entry = existing.get("entry_price", 0) if existing_entry > 0: positions[sym] = { "amount": wallet_amt, "entry_price": existing_entry, "value_usdt": wallet_amt * existing_entry, "last_update": time.time(), } elif sym in positions: # No wallet balance and no orders — remove position positions.pop(sym, None) cost_tracking[sym] = None time.sleep(0.5) continue # Calculate cost basis calc_amount, entry_price = calculate_cost_basis(orders) # Deviation check: calculated amount vs wallet balance if wallet_amt > 0 and calc_amount > 0: deviation = abs(calc_amount - wallet_amt) / wallet_amt logger.info( " %s: calc_amount=%.6f, wallet=%.6f, entry=%.6g, deviation=%.2f%%", sym, calc_amount, wallet_amt, entry_price, deviation * 100, ) if deviation > 0.05: msg = ( f"\n" f"⚠️ *成本基礎偏差警告*\n" f"幣種: {config.SYMBOL_NAMES.get(sym, sym)}\n" f"計算量: {calc_amount:.6f}\n" f"錢包量: {wallet_amt:.6f}\n" f"偏差: {deviation:.2%}\n" f"建議檢查 cost_tracking 起始日期" ) slack_notifier._send({"text": msg}) logger.warning(" Deviation > 5%% for %s, Slack notified", sym) else: logger.info( " %s: calc_amount=%.6f, wallet=%.6f, entry=%.6g", sym, calc_amount, wallet_amt, entry_price, ) # Update positions: use wallet amount as authoritative, calculated entry_price if wallet_amt > 0 and entry_price > 0: positions[sym] = { "amount": wallet_amt, "entry_price": entry_price, "value_usdt": wallet_amt * entry_price, "last_update": time.time(), } elif wallet_amt <= 0: # Fully sold — remove position and mark tracking as null positions.pop(sym, None) cost_tracking[sym] = None logger.info(" %s fully sold, marking cost_tracking as null", sym) # Update cost_tracking: find first BUY mts_create if wallet_amt > 0: first_buy_mts = None for order in orders: status = order[13] if len(order) > 13 else "" if isinstance(status, str) and "EXECUTED" in status and float(order[7]) > 0: first_buy_mts = order[4] # mts_create break if first_buy_mts is not None: cost_tracking[sym] = first_buy_mts time.sleep(1) # Rate limit between symbols (avoid nonce collisions) # 5. Remove positions for currencies no longer in wallet for sym in list(positions.keys()): currency = _symbol_to_currency(sym) if wallet_balances.get(currency, 0) <= 0: positions.pop(sym, None) if sym in cost_tracking: cost_tracking[sym] = None # 6. Save updated positions (update balances too) positions_data["positions"] = positions positions_data["total_balance_usdt"] = usdt_balance positions_data["available_usdt"] = usdt_available positions_data["last_updated"] = time.time() with open(config.POSITIONS_FILE, "w") as f: json.dump(positions_data, f, indent=2) # 7. Save cost tracking save_cost_tracking(cost_tracking) logger.info("Cost basis sync complete. Updated %d positions.", len(positions)) if __name__ == "__main__": run_sync()