bifitnex-trading/sync_cost_basis.py
kroutony ce88afdb96 Add cost basis sync, fix stop-loss reliability, improve reporting
- New sync_cost_basis.py: recalculate entry_price from order history,
  sync amounts/balances from wallet (cron every 30 min)
- Fix stop-loss ID staleness: update stop_orders_by_sym in step 2b
  after placing new stops, preventing SELL failures from stale IDs
- Fix position limit inconsistency: use same total_balance in
  validate_trade instead of calling check_position_limit
- Skip stop-loss for positions below MIN_ORDER_AMOUNT
- Add API response body logging for 500 errors
- Cancel "Order not found" treated as success (not error)
- Post-trade wallet refresh to ensure fresh balances
- Report: show total value, total return %, exclude dust positions
- Crontab offset +1 min from candle close for complete data
- Handle PARTIALLY FILLED order status in cost calculation
- Sort orders by mts_create in Python (Bitfinex sort unreliable)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 11:18:49 +00:00

341 lines
12 KiB
Python

#!/usr/bin/env python3
"""Sync cost basis (entry_price), wallet balances, and per-coin amounts from order history.
Designed to run every 30 minutes via cron.
"""
import json
import logging
import os
import sys
import time
import config
import data_fetcher
import slack_notifier
# ---------------------------------------------------------------------------
# Logging setup (separate log file)
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("sync_cost_basis.log"),
],
)
logger = logging.getLogger("sync_cost_basis")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _symbol_to_currency(symbol: str) -> str:
"""Convert symbol like 'tETHUST' or 'tAVAX:UST' to currency code like 'ETH'."""
return symbol[1:].replace(":UST", "").replace("UST", "")
def _build_currency_to_symbol() -> dict[str, str]:
"""Build mapping: currency code → symbol from TOP_15_SYMBOLS."""
return {_symbol_to_currency(sym): sym for sym in config.TOP_15_SYMBOLS}
# ---------------------------------------------------------------------------
# Cost tracking persistence
# ---------------------------------------------------------------------------
def load_cost_tracking() -> dict:
if os.path.exists(config.COST_TRACKING_FILE):
try:
with open(config.COST_TRACKING_FILE) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
pass
return {}
def save_cost_tracking(data: dict):
with open(config.COST_TRACKING_FILE, "w") as f:
json.dump(data, f, indent=2)
def init_cost_tracking(positions: dict) -> dict:
"""Initialize cost_tracking for current positions with default start_ms."""
tracking = {}
for sym in positions:
tracking[sym] = config.INIT_COST_BASIS_START_MS
return tracking
# ---------------------------------------------------------------------------
# Fetch order history with pagination
# ---------------------------------------------------------------------------
def fetch_orders_hist(symbol: str, start_ms: int) -> list[list]:
"""Fetch all filled orders for symbol from start_ms to now, with pagination."""
all_orders = []
end_ms = int(time.time() * 1000)
while True:
body = {"start": start_ms, "end": end_ms, "limit": 500}
path = f"/v2/auth/r/orders/{symbol}/hist"
try:
raw = data_fetcher._auth_post(path, body)
except Exception as e:
logger.error("Failed to fetch orders for %s: %s", symbol, e)
break
if not raw or not isinstance(raw, list):
break
all_orders.extend(raw)
if len(raw) < 500:
break
# Paginate: default is descending, so oldest is last
last_mts = min(o[4] for o in raw) # earliest mts_create
end_ms = last_mts - 1
# Sort by mts_create ascending (Bitfinex default order is unreliable)
all_orders.sort(key=lambda o: o[4])
return all_orders
# ---------------------------------------------------------------------------
# Cost basis calculation
# ---------------------------------------------------------------------------
def calculate_cost_basis(orders: list[list]) -> tuple[float, float]:
"""Calculate cost basis from sorted orders.
Returns (amount, entry_price).
Order fields: [id, gid, cid, symbol, mts_create, mts_update, amount, amount_orig,
type, type_prev, mts_tif, _, flags, status, _, _, price, price_avg,
price_trailing, ...]
- amount (index 6): remaining amount (0 if fully filled)
- amount_orig (index 7): original order amount (positive=buy, negative=sell)
- price_avg (index 17): average execution price
"""
amount = 0.0
total_cost = 0.0
for order in orders:
status = order[13] if len(order) > 13 else ""
if not isinstance(status, str):
continue
# Include EXECUTED and PARTIALLY FILLED orders; skip CANCELED with no fill
has_fill = "EXECUTED" in status or "PARTIALLY FILLED" in status
if not has_fill:
continue
amount_orig = float(order[7]) # positive = buy, negative = sell
amount_remaining = float(order[6]) # 0 if fully filled
price_avg = float(order[17]) if len(order) > 17 and order[17] else 0
if price_avg <= 0:
continue
# Actual executed amount = amount_orig - amount_remaining
exec_amt = abs(amount_orig - amount_remaining)
if exec_amt <= 0:
continue
if amount_orig > 0: # BUY
total_cost += exec_amt * price_avg
amount += exec_amt
else: # SELL
if amount > 0:
ratio = min(exec_amt / amount, 1.0)
total_cost *= (1 - ratio)
amount -= exec_amt
if amount < 0:
amount = 0.0
total_cost = 0.0
entry_price = total_cost / amount if amount > 0 else 0.0
return amount, entry_price
# ---------------------------------------------------------------------------
# Main sync logic
# ---------------------------------------------------------------------------
def run_sync():
logger.info("=" * 60)
logger.info("Starting cost basis sync")
# 1. Fetch wallet balances
try:
account_status = data_fetcher.fetch_account_status()
except Exception as e:
logger.error("Failed to fetch account status: %s", e)
return
wallet_balances: dict[str, float] = {}
usdt_balance = 0.0
usdt_available = 0.0
for w in account_status.get("wallets", []):
if w.get("type") == "exchange":
currency = w["currency"]
bal = w.get("balance", 0)
if currency in ("UST", "USDT"):
usdt_balance = bal
usdt_available = w.get("available", 0) or bal
else:
if bal > 0:
wallet_balances[currency] = bal
currency_to_symbol = _build_currency_to_symbol()
# 2. Load cost tracking and positions
cost_tracking = load_cost_tracking()
positions_data = {}
if os.path.exists(config.POSITIONS_FILE):
try:
with open(config.POSITIONS_FILE) as f:
positions_data = json.load(f)
except (json.JSONDecodeError, IOError):
pass
positions = positions_data.get("positions", {})
# Initialize cost_tracking if empty
if not cost_tracking:
cost_tracking = init_cost_tracking(positions)
save_cost_tracking(cost_tracking)
logger.info("Initialized cost_tracking.json with %d symbols", len(cost_tracking))
# 3. Determine which symbols to process
# Symbols from: current positions + cost_tracking + wallet balances
all_symbols = set()
all_symbols.update(positions.keys())
all_symbols.update(k for k, v in cost_tracking.items() if v is not None)
for currency, bal in wallet_balances.items():
sym = currency_to_symbol.get(currency)
if sym:
all_symbols.add(sym)
# 4. Process each symbol
for sym in sorted(all_symbols):
currency = _symbol_to_currency(sym)
wallet_amt = wallet_balances.get(currency, 0)
start_ms = cost_tracking.get(sym)
# If no start_ms and no wallet balance, skip
if start_ms is None and wallet_amt <= 0:
continue
# If wallet has balance but no tracking entry, initialize
if start_ms is None and wallet_amt > 0:
start_ms = config.INIT_COST_BASIS_START_MS
cost_tracking[sym] = start_ms
logger.info("Processing %s (wallet: %.6f, start_ms: %s)", sym, wallet_amt, start_ms)
# Fetch order history
orders = fetch_orders_hist(sym, start_ms)
logger.info(" Fetched %d orders for %s", len(orders), sym)
if not orders:
# No orders found — keep existing position data but update amount from wallet
if wallet_amt > 0:
existing = positions.get(sym, {})
existing_entry = existing.get("entry_price", 0)
if existing_entry > 0:
positions[sym] = {
"amount": wallet_amt,
"entry_price": existing_entry,
"value_usdt": wallet_amt * existing_entry,
"last_update": time.time(),
}
elif sym in positions:
# No wallet balance and no orders — remove position
positions.pop(sym, None)
cost_tracking[sym] = None
time.sleep(0.5)
continue
# Calculate cost basis
calc_amount, entry_price = calculate_cost_basis(orders)
# Deviation check: calculated amount vs wallet balance
if wallet_amt > 0 and calc_amount > 0:
deviation = abs(calc_amount - wallet_amt) / wallet_amt
logger.info(
" %s: calc_amount=%.6f, wallet=%.6f, entry=%.6g, deviation=%.2f%%",
sym, calc_amount, wallet_amt, entry_price, deviation * 100,
)
if deviation > 0.05:
msg = (
f"<!channel>\n"
f"⚠️ *成本基礎偏差警告*\n"
f"幣種: {config.SYMBOL_NAMES.get(sym, sym)}\n"
f"計算量: {calc_amount:.6f}\n"
f"錢包量: {wallet_amt:.6f}\n"
f"偏差: {deviation:.2%}\n"
f"建議檢查 cost_tracking 起始日期"
)
slack_notifier._send({"text": msg})
logger.warning(" Deviation > 5%% for %s, Slack notified", sym)
else:
logger.info(
" %s: calc_amount=%.6f, wallet=%.6f, entry=%.6g",
sym, calc_amount, wallet_amt, entry_price,
)
# Update positions: use wallet amount as authoritative, calculated entry_price
if wallet_amt > 0 and entry_price > 0:
positions[sym] = {
"amount": wallet_amt,
"entry_price": entry_price,
"value_usdt": wallet_amt * entry_price,
"last_update": time.time(),
}
elif wallet_amt <= 0:
# Fully sold — remove position and mark tracking as null
positions.pop(sym, None)
cost_tracking[sym] = None
logger.info(" %s fully sold, marking cost_tracking as null", sym)
# Update cost_tracking: find first BUY mts_create
if wallet_amt > 0:
first_buy_mts = None
for order in orders:
status = order[13] if len(order) > 13 else ""
if isinstance(status, str) and "EXECUTED" in status and float(order[7]) > 0:
first_buy_mts = order[4] # mts_create
break
if first_buy_mts is not None:
cost_tracking[sym] = first_buy_mts
time.sleep(1) # Rate limit between symbols (avoid nonce collisions)
# 5. Remove positions for currencies no longer in wallet
for sym in list(positions.keys()):
currency = _symbol_to_currency(sym)
if wallet_balances.get(currency, 0) <= 0:
positions.pop(sym, None)
if sym in cost_tracking:
cost_tracking[sym] = None
# 6. Save updated positions (update balances too)
positions_data["positions"] = positions
positions_data["total_balance_usdt"] = usdt_balance
positions_data["available_usdt"] = usdt_available
positions_data["last_updated"] = time.time()
with open(config.POSITIONS_FILE, "w") as f:
json.dump(positions_data, f, indent=2)
# 7. Save cost tracking
save_cost_tracking(cost_tracking)
logger.info("Cost basis sync complete. Updated %d positions.", len(positions))
if __name__ == "__main__":
run_sync()