"""
TraderMakeMoney Journal Integration for Bybit Zvwap Bot
=======================================================
Auto-tags Zvwap trades: col10=strategy (Zvwap), col1=direction (L/S).
Descriptions store analytics data (Z, NATR, CHOP, Vol, MaxDD, ROI) for optimization.
Rate limited: min 3s between API calls, retry queue with dedup.
"""
import logging
import os
import time
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import requests
logger = logging.getLogger("tmm")
VANCOUVER_TZ = ZoneInfo("America/Vancouver")
TMM_API_KEY = os.environ.get("TMM_API_KEY", "")
TMM_BASE_URL = "https://tradermake.money/api/v2"
TMM_API_KEY_ID = int(os.environ.get("TMM_API_KEY_ID", "276317"))
TAG_COL_STRATEGY = 10 # strategy tag: "DCA"
TAG_COL_DIRECTION = 1 # direction tag: "L" or "S"
MIN_API_INTERVAL = 3.0
class TMMClient:
def __init__(self):
self.api_key = TMM_API_KEY
self.session = requests.Session()
self.session.headers.update({
"API-KEY": self.api_key,
"Content-Type": "application/json",
})
self.enabled = bool(self.api_key)
self._pending_tags = []
self._tagged_trade_ids = set()
self._symbol_to_trade = {} # symbol β trade_id mapping for close desc
self._last_api_call = 0.0
if self.enabled:
logger.info("TMM integration enabled")
else:
logger.warning("TMM disabled (no API key)")
# ββ HTTP ββββββββββββββββββββββββββββββββββββββββββββββββββ
def _rate_wait(self):
now = time.time()
elapsed = now - self._last_api_call
if elapsed < MIN_API_INTERVAL:
time.sleep(MIN_API_INTERVAL - elapsed)
self._last_api_call = time.time()
def _get(self, path, params=None):
if not self.enabled:
return None
try:
self._rate_wait()
resp = self.session.get(f"{TMM_BASE_URL}{path}", params=params, timeout=10)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"TMM GET {path}: {e}")
return None
def _post(self, path, data=None):
if not self.enabled:
return None
try:
self._rate_wait()
resp = self.session.post(f"{TMM_BASE_URL}{path}", json=data, timeout=10)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"TMM POST {path}: {e}")
return None
# ββ HELPERS βββββββββββββββββββββββββββββββββββββββββββββββ
@staticmethod
def _fmt_volume(vol):
"""Format volume: 45200000 β '$45.2M'."""
if vol >= 1_000_000_000:
return f"${vol / 1e9:.1f}B"
if vol >= 1_000_000:
return f"${vol / 1e6:.0f}M"
if vol >= 1_000:
return f"${vol / 1e3:.0f}K"
return f"${vol:.0f}"
# ββ FIND TRADE ββββββββββββββββββββββββββββββββββββββββββββ
def find_recent_trade(self, symbol, side, window_ms=300_000, tagged_only=False):
"""Find most recent TMM trade for symbol+side within window.
Args:
tagged_only: if True, only return already-tagged trades (for close desc update).
if False, skip tagged trades (for entry tagging).
"""
result = self._get("/trades", params={
"page": 1, "itemsPerPage": 100,
"sortBy": "open_time", "sortDesc": "true",
})
if not result or "data" not in result:
return None
now_ms = int(time.time() * 1000)
for trade in result["data"]:
if trade["symbol"] != symbol:
continue
t_side = trade["side"].upper()
if (t_side == "LONG" and side != "BUY") or (t_side == "SHORT" and side != "SELL"):
continue
trade_id = trade["id"]
if tagged_only and trade_id not in self._tagged_trade_ids:
continue
if not tagged_only and trade_id in self._tagged_trade_ids:
continue
if abs(trade["open_time"] - now_ms) <= window_ms:
return trade_id
return None
# ββ TAG & DESCRIBE ββββββββββββββββββββββββββββββββββββββββ
def tag_trade(self, trade_id, tag_name, column=TAG_COL_STRATEGY):
result = self._post(f"/trades/{trade_id}/tags", {
"tags": [{"name": tag_name, "column": column}],
})
if result and result.get("status") == "success":
logger.info(f"TMM: tagged #{trade_id} β '{tag_name}' (col={column})")
return True
return False
def tag_trade_both(self, trade_id, strategy_tag, direction_tag):
"""Apply both strategy + direction tags in one API call (prevents overwrite)."""
result = self._post(f"/trades/{trade_id}/tags", {
"tags": [
{"name": strategy_tag, "column": TAG_COL_STRATEGY},
{"name": direction_tag, "column": TAG_COL_DIRECTION},
],
})
if result and result.get("status") == "success":
logger.info(f"TMM: tagged #{trade_id} β '{strategy_tag}' + '{direction_tag}'")
return True
return False
def update_description(self, trade_id, description):
result = self._post(f"/trades/{trade_id}/update", {
"description": description,
})
if result and result.get("status") == "success":
logger.info(f"TMM: desc updated #{trade_id}")
return True
return False
# ββ DCA EVENTS ββββββββββββββββββββββββββββββββββββββββββββ
def on_dca_entry(self, symbol, side, z_score, entry_price,
natr=0.0, volume_24h=0.0, deals_open=0, balance=0.0,
bo_usd=20.0, max_so=0, leverage=3,
chop=0.0, tp_pct=3.0, sl_pct=1.0):
"""Queue tag for Zvwap deal entry with analytics data."""
if not self.enabled:
return
direction = "L" if side == "BUY" else "S"
dir_full = "LONG" if side == "BUY" else "SHORT"
vol_str = self._fmt_volume(volume_24h)
# Calculate TP/SL prices for description
if side == "BUY":
tp_price = entry_price * (1 + tp_pct / 100)
sl_price = entry_price * (1 - sl_pct / 100)
else:
tp_price = entry_price * (1 - tp_pct / 100)
sl_price = entry_price * (1 + sl_pct / 100)
description = (
f"Z={z_score:+.2f} | NATR={natr:.2f}% | CHOP={chop:.0f}\n"
f"Entry=${entry_price:.6f} | BO=${bo_usd:.0f} | Lev={leverage}x\n"
f"TP={tp_pct}% (${tp_price:.6f}) | SL={sl_pct}% (${sl_price:.6f})\n"
f"Vol={vol_str} | Deals={deals_open}/6 | Bal=${balance:.0f}"
)
self._pending_tags.append({
"symbol": symbol,
"side": side,
"tag_strategy": "Zvwap",
"tag_direction": direction,
"description": description,
"attempts": 0,
"next_retry": time.time() + 8,
})
logger.info(f"TMM: Zvwap entry {symbol} {dir_full} Z={z_score:+.2f} β queued")
def on_dca_so_filled(self, trade_id, symbol, side, so_num, so_price,
avg_price, total_qty, invested, tp_pct=1.5,
natr=0.0, volume_24h=0.0, z_entry=0.0):
"""Update description when SO fills β keep entry info + add SO status."""
if not self.enabled or trade_id not in self._tagged_trade_ids:
return
if side == "BUY":
tp_price = avg_price * (1 + tp_pct / 100)
else:
tp_price = avg_price * (1 - tp_pct / 100)
vol_str = self._fmt_volume(volume_24h)
self.update_description(trade_id, (
f"Z={z_entry:+.2f} | NATR={natr:.2f}% | Vol={vol_str}\n"
f"SO{so_num} @ ${so_price:.6f} | Avg=${avg_price:.6f}\n"
f"TP=${tp_price:.6f} | Invested=${invested:.1f}"
))
def on_dca_close(self, symbol, side, pnl, reason, duration_min, so_count,
z_entry, z_exit=None, invested=0.0, max_dd_pct=0.0, max_so=0,
natr=0.0, volume_24h=0.0, chop=0.0,
tp_pct=3.0, sl_pct=1.0, max_favorable_pct=0.0):
"""Update description on deal close with entry + close analytics."""
if not self.enabled:
return
roi = (pnl / invested * 100) if invested > 0 else 0.0
z_exit_str = f"{z_exit:+.2f}" if z_exit is not None else "n/a"
vol_str = self._fmt_volume(volume_24h)
description = (
f"PnL=${pnl:+.4f} | ROI={roi:+.1f}% | {duration_min:.0f}min\n"
f"Reason={reason} | R:R=3:1 (TP={tp_pct}%/SL={sl_pct}%)\n"
f"Z: {z_entry:+.2f}β{z_exit_str} | MFE=+{max_favorable_pct:.1f}% | MaxDD={max_dd_pct:+.1f}%\n"
f"NATR={natr:.2f}% | CHOP={chop:.0f} | Vol={vol_str} | Inv=${invested:.1f}"
)
# Find and update existing trade
trade_id = self._find_tagged_trade(symbol, side)
if trade_id:
self.update_description(trade_id, description)
else:
logger.warning(f"TMM: couldn't find trade to update close desc for {symbol}")
def on_dca_recovered(self, symbol, side, entry_price, qty,
z_score=0.0, natr=0.0, chop=0.0, volume_24h=0.0,
tp_pct=3.0, sl_pct=1.0, leverage=3):
"""Tag a recovered deal (found on exchange after restart) with full analytics."""
if not self.enabled:
return
direction = "L" if side == "BUY" else "S"
vol_str = self._fmt_volume(volume_24h)
if side == "BUY":
tp_price = entry_price * (1 + tp_pct / 100)
sl_price = entry_price * (1 - sl_pct / 100)
else:
tp_price = entry_price * (1 - tp_pct / 100)
sl_price = entry_price * (1 + sl_pct / 100)
description = (
f"Z={z_score:+.2f} | NATR={natr:.2f}% | CHOP={chop:.0f}\n"
f"Entry=${entry_price:.6f} | BO=${entry_price * qty / leverage:.0f} | Lev={leverage}x\n"
f"TP={tp_pct}% (${tp_price:.6f}) | SL={sl_pct}% (${sl_price:.6f})\n"
f"Vol={vol_str} | (recovered after restart)"
)
self._pending_tags.append({
"symbol": symbol,
"side": side,
"tag_strategy": "Zvwap",
"tag_direction": direction,
"description": description,
"attempts": 0,
"next_retry": time.time() + 8,
"window_ms": 7 * 24 * 3600 * 1000, # 7 days β recovered deals may be old
})
logger.info(f"TMM: DCA recovered {symbol} {direction} β queued for tagging")
def _find_tagged_trade(self, symbol, side):
"""Find a previously tagged trade for this symbol."""
# Fast path: check our symbolβtrade_id mapping
trade_id = self._symbol_to_trade.get(symbol)
if trade_id and trade_id in self._tagged_trade_ids:
return trade_id
# Fallback: search recent trades in TMM (wider window for SO deals)
return self.find_recent_trade(symbol, side, window_ms=7200_000, tagged_only=True) # 2h
# ββ RETRY QUEUE βββββββββββββββββββββββββββββββββββββββββββ
def _apply_tags(self, trade_id, item):
"""Apply strategy + direction tags + description in minimal API calls."""
self.tag_trade_both(trade_id, item["tag_strategy"], item["tag_direction"])
self.update_description(trade_id, item["description"])
self._tagged_trade_ids.add(trade_id)
self._symbol_to_trade[item["symbol"]] = trade_id
if len(self._tagged_trade_ids) > 500:
self._tagged_trade_ids = set(list(self._tagged_trade_ids)[-300:])
# Clean up symbol mapping too
valid = self._tagged_trade_ids
self._symbol_to_trade = {s: t for s, t in self._symbol_to_trade.items() if t in valid}
def retry_pending_tags(self):
"""Process pending tags. Call from monitor loop. Max 1 per tick."""
if not self._pending_tags:
return
now = time.time()
remaining = []
processed_one = False
for item in self._pending_tags:
if processed_one or now < item["next_retry"]:
remaining.append(item)
continue
window = item.get("window_ms", 300_000)
trade_id = self.find_recent_trade(item["symbol"], item["side"], window_ms=window)
if trade_id:
self._apply_tags(trade_id, item)
processed_one = True
elif item["attempts"] < 8:
item["attempts"] += 1
item["next_retry"] = now + 20
remaining.append(item)
processed_one = True
else:
logger.warning(f"TMM: gave up tagging {item['symbol']} after 8 attempts")
self._pending_tags = remaining