#!/usr/bin/env python3 """Sync cost basis (entry_price), wallet balances, and per-coin amounts from order history. Designed to run every 30 minutes via cron. """ import json import logging import os import sys import time from datetime import datetime, timezone, timedelta import config import data_fetcher import slack_notifier # --------------------------------------------------------------------------- # Logging setup (separate log file) # --------------------------------------------------------------------------- _TZ_UTC8 = timezone(timedelta(hours=8)) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("sync_cost_basis.log"), ], ) logging.Formatter.converter = lambda *args: datetime.now(_TZ_UTC8).timetuple() logger = logging.getLogger("sync_cost_basis") # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _symbol_to_currency(symbol: str) -> str: """Convert symbol like 'tETHUST' or 'tAVAX:UST' to currency code like 'ETH'.""" return symbol[1:].replace(":UST", "").replace("UST", "") def _build_currency_to_symbol() -> dict[str, str]: """Build mapping: currency code → symbol from TOP_15_SYMBOLS.""" return {_symbol_to_currency(sym): sym for sym in config.TOP_15_SYMBOLS} # --------------------------------------------------------------------------- # Cost tracking persistence # --------------------------------------------------------------------------- def load_cost_tracking() -> dict: if os.path.exists(config.COST_TRACKING_FILE): try: with open(config.COST_TRACKING_FILE) as f: return json.load(f) except (json.JSONDecodeError, IOError): pass return {} def save_cost_tracking(data: dict): with open(config.COST_TRACKING_FILE, "w") as f: json.dump(data, f, indent=2) def init_cost_tracking(positions: dict) -> dict: """Initialize cost_tracking for current positions with default start_ms.""" tracking = {} for sym in positions: tracking[sym] = config.INIT_COST_BASIS_START_MS return tracking # --------------------------------------------------------------------------- # Fetch order history with pagination # --------------------------------------------------------------------------- def fetch_orders_hist(symbol: str, start_ms: int) -> list[list]: """Fetch all filled orders for symbol from start_ms to now, with pagination.""" all_orders = [] end_ms = int(time.time() * 1000) while True: body = {"start": start_ms, "end": end_ms, "limit": 500} path = f"/v2/auth/r/orders/{symbol}/hist" try: raw = data_fetcher._auth_post(path, body) except Exception as e: logger.error("Failed to fetch orders for %s: %s", symbol, e) break if not raw or not isinstance(raw, list): break all_orders.extend(raw) if len(raw) < 500: break # Paginate: default is descending, so oldest is last last_mts = min(o[4] for o in raw) # earliest mts_create end_ms = last_mts - 1 # Sort by mts_create ascending (Bitfinex default order is unreliable) all_orders.sort(key=lambda o: o[4]) return all_orders # --------------------------------------------------------------------------- # Cost basis calculation # --------------------------------------------------------------------------- def calculate_cost_basis(orders: list[list]) -> tuple[float, float]: """Calculate cost basis from sorted orders. Returns (amount, entry_price). Order fields: [id, gid, cid, symbol, mts_create, mts_update, amount, amount_orig, type, type_prev, mts_tif, _, flags, status, _, _, price, price_avg, price_trailing, ...] - amount (index 6): remaining amount (0 if fully filled) - amount_orig (index 7): original order amount (positive=buy, negative=sell) - price_avg (index 17): average execution price """ amount = 0.0 total_cost = 0.0 for order in orders: status = order[13] if len(order) > 13 else "" if not isinstance(status, str): continue # Include EXECUTED and PARTIALLY FILLED orders; skip CANCELED with no fill has_fill = "EXECUTED" in status or "PARTIALLY FILLED" in status if not has_fill: continue amount_orig = float(order[7]) # positive = buy, negative = sell amount_remaining = float(order[6]) # 0 if fully filled price_avg = float(order[17]) if len(order) > 17 and order[17] else 0 if price_avg <= 0: continue # Actual executed amount = amount_orig - amount_remaining exec_amt = abs(amount_orig - amount_remaining) if exec_amt <= 0: continue if amount_orig > 0: # BUY total_cost += exec_amt * price_avg amount += exec_amt else: # SELL if amount > 0: ratio = min(exec_amt / amount, 1.0) total_cost *= (1 - ratio) amount -= exec_amt if amount < 0: amount = 0.0 total_cost = 0.0 entry_price = total_cost / amount if amount > 0 else 0.0 return amount, entry_price # --------------------------------------------------------------------------- # Main sync logic # --------------------------------------------------------------------------- def run_sync(): logger.info("=" * 60) logger.info("Starting cost basis sync") # 1. Fetch wallet balances try: account_status = data_fetcher.fetch_account_status() except Exception as e: logger.error("Failed to fetch account status: %s", e) return wallet_balances: dict[str, float] = {} usdt_balance = 0.0 usdt_available = 0.0 for w in account_status.get("wallets", []): if w.get("type") == "exchange": currency = w["currency"] bal = w.get("balance", 0) if currency in ("UST", "USDT"): usdt_balance = bal usdt_available = w.get("available", 0) or bal else: if bal > 0: wallet_balances[currency] = bal currency_to_symbol = _build_currency_to_symbol() # 2. Load cost tracking and positions cost_tracking = load_cost_tracking() positions_data = {} if os.path.exists(config.POSITIONS_FILE): try: with open(config.POSITIONS_FILE) as f: positions_data = json.load(f) except (json.JSONDecodeError, IOError): pass positions = positions_data.get("positions", {}) # Initialize cost_tracking if empty if not cost_tracking: cost_tracking = init_cost_tracking(positions) save_cost_tracking(cost_tracking) logger.info("Initialized cost_tracking.json with %d symbols", len(cost_tracking)) # 3. Determine which symbols to process # Symbols from: current positions + cost_tracking + wallet balances all_symbols = set() all_symbols.update(positions.keys()) all_symbols.update(k for k, v in cost_tracking.items() if v is not None) for currency, bal in wallet_balances.items(): sym = currency_to_symbol.get(currency) if sym: all_symbols.add(sym) # 4. Process each symbol for sym in sorted(all_symbols): currency = _symbol_to_currency(sym) wallet_amt = wallet_balances.get(currency, 0) start_ms = cost_tracking.get(sym) # If no start_ms and no wallet balance, skip if start_ms is None and wallet_amt <= 0: continue # If wallet has balance but no tracking entry, initialize if start_ms is None and wallet_amt > 0: start_ms = config.INIT_COST_BASIS_START_MS cost_tracking[sym] = start_ms logger.info("Processing %s (wallet: %.6f, start_ms: %s)", sym, wallet_amt, start_ms) # Fetch order history orders = fetch_orders_hist(sym, start_ms) logger.info(" Fetched %d orders for %s", len(orders), sym) if not orders: # No orders found — keep existing position data but update amount from wallet if wallet_amt > 0: existing = positions.get(sym, {}) existing_entry = existing.get("entry_price", 0) if existing_entry > 0: positions[sym] = { "amount": wallet_amt, "entry_price": existing_entry, "value_usdt": wallet_amt * existing_entry, "last_update": time.time(), } elif sym in positions: # No wallet balance and no orders — remove position positions.pop(sym, None) cost_tracking[sym] = None time.sleep(0.5) continue # Calculate cost basis calc_amount, entry_price = calculate_cost_basis(orders) # Deviation check: calculated amount vs wallet balance if wallet_amt > 0 and calc_amount > 0: deviation = abs(calc_amount - wallet_amt) / wallet_amt logger.info( " %s: calc_amount=%.6f, wallet=%.6f, entry=%.6g, deviation=%.2f%%", sym, calc_amount, wallet_amt, entry_price, deviation * 100, ) if deviation > 0.05: msg = ( f"\n" f"⚠️ *成本基礎偏差警告*\n" f"幣種: {config.SYMBOL_NAMES.get(sym, sym)}\n" f"計算量: {calc_amount:.6f}\n" f"錢包量: {wallet_amt:.6f}\n" f"偏差: {deviation:.2%}\n" f"建議檢查 cost_tracking 起始日期" ) slack_notifier._send({"text": msg}) logger.warning(" Deviation > 5%% for %s, Slack notified", sym) else: logger.info( " %s: calc_amount=%.6f, wallet=%.6f, entry=%.6g", sym, calc_amount, wallet_amt, entry_price, ) # Update positions: use wallet amount as authoritative, calculated entry_price if wallet_amt > 0 and entry_price > 0: positions[sym] = { "amount": wallet_amt, "entry_price": entry_price, "value_usdt": wallet_amt * entry_price, "last_update": time.time(), } elif wallet_amt <= 0: # Fully sold — remove position and mark tracking as null positions.pop(sym, None) cost_tracking[sym] = None logger.info(" %s fully sold, marking cost_tracking as null", sym) # Update cost_tracking: find first BUY mts_create if wallet_amt > 0: first_buy_mts = None for order in orders: status = order[13] if len(order) > 13 else "" if isinstance(status, str) and "EXECUTED" in status and float(order[7]) > 0: first_buy_mts = order[4] # mts_create break if first_buy_mts is not None: cost_tracking[sym] = first_buy_mts time.sleep(1) # Rate limit between symbols (avoid nonce collisions) # 5. Remove positions for currencies no longer in wallet for sym in list(positions.keys()): currency = _symbol_to_currency(sym) if wallet_balances.get(currency, 0) <= 0: positions.pop(sym, None) if sym in cost_tracking: cost_tracking[sym] = None # 6. Save updated positions (update balances too) positions_data["positions"] = positions positions_data["total_balance_usdt"] = usdt_balance positions_data["available_usdt"] = usdt_available positions_data["last_updated"] = time.time() with open(config.POSITIONS_FILE, "w") as f: json.dump(positions_data, f, indent=2) # 7. Save cost tracking save_cost_tracking(cost_tracking) logger.info("Cost basis sync complete. Updated %d positions.", len(positions)) if __name__ == "__main__": run_sync()