| Overall Statistics |
|
Total Orders 5997 Average Win 0.00% Average Loss 0.00% Compounding Annual Return 14.766% Drawdown 0.000% Expectancy 0.965 Start Equity 100000.00 End Equity 103687.18 Net Profit 3.687% Sharpe Ratio 9.737 Sortino Ratio 268.77 Probabilistic Sharpe Ratio 100% Loss Rate 19% Win Rate 81% Profit-Loss Ratio 1.43 Alpha 0.094 Beta 0 Annual Standard Deviation 0.01 Annual Variance 0 Information Ratio -6.292 Tracking Error 0.557 Treynor Ratio 672.529 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset ETHUSD 2XR Portfolio Turnover 255.76% Drawdown Recovery 10 |
# region imports
from AlgorithmImports import *
from collections import deque
from datetime import timedelta
import math
# endregion
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from AlgorithmImports import QCAlgorithm, BrokerageName, AccountType, Resolution, ConstantFeeModel, Slice, TickType, Tick, UpdateOrderFields, OrderStatus, OrderField, OrderEvent
class Liquiditymonstercore(QCAlgorithm):
def _get_param(self, key: str, default_value: str) -> str:
try:
getter = getattr(self, "get_parameter", None) or getattr(self, "GetParameter", None)
if getter is not None:
val = getter(key)
if val is not None and val != "":
return val
except Exception:
pass
return default_value
def initialize(self):
# ---- Configurable params ----
self.base_symbol = self._get_param("symbol", "ETHUSD")
self.market_name = self._get_param("market", "GDAX").upper()
self.brokerage_param = self._get_param("brokerage", "COINBASE").upper()
start = self._get_param("start", "2024-01-08")
end = self._get_param("end", "2024-04-12")
cash = float(self._get_param("cash", "100000"))
self.maker_fee_rate = float(self._get_param("maker_fee", "0.0002"))
self.taker_fee_rate = float(self._get_param("taker_fee", "0.0005"))
self.use_maker_for_fades = (self._get_param("use_maker_for_fades", "true").lower() == "true")
self.max_position_value = float(self._get_param("max_pos_value", "0.2"))
self.stop_vol_k = float(self._get_param("stop_vol_k", "3.0"))
self.time_stop_seconds = int(self._get_param("time_stop_s", "10"))
self.partial_tp_ratio = float(self._get_param("partial_tp_ratio", "0.5"))
self.partial_tp_target_sigma = float(self._get_param("partial_tp_sigma", "1.0"))
self.trail_sigma = float(self._get_param("trail_sigma", "1.0"))
# Fee model selection: "coinbase" (default) or "zero"
self.fee_model = self._get_param("fee_model", "zero").lower()
# Feature smoothing and thresholds
self.alpha_vol = float(self._get_param("alpha_vol", "0.1"))
self.alpha_ofi = float(self._get_param("alpha_ofi", "0.2"))
self.alpha_cvd = float(self._get_param("alpha_cvd", "0.2"))
self.micro_tilt_threshold = float(self._get_param("micro_tilt_threshold", "0.05"))
# Safety gates and cadence
self.max_spread_allowed = float(self._get_param("max_spread", "5.0"))
self.trade_cooldown_s = int(self._get_param("trade_cooldown_s", "5"))
# Breakout execution
self.breakout_sigma_band = float(self._get_param("breakout_sigma_band", "0.5"))
self.breakout_limit_extra_sigma = float(self._get_param("breakout_limit_extra_sigma", "0.2"))
# Risk and portfolio controls
self.risk_per_trade_pct = float(self._get_param("risk_per_trade_pct", "0.01"))
self.min_trade_notional = float(self._get_param("min_trade_notional", "50"))
self.max_positions = int(self._get_param("max_positions", "2"))
self.max_portfolio_heat_pct = float(self._get_param("max_portfolio_heat_pct", "0.30"))
self.daily_loss_limit_pct = float(self._get_param("daily_loss_limit_pct", "0.05"))
# Dynamic sizing controls
self.size_mult_per_confirmation = float(self._get_param("size_mult_per_confirmation", "0.25"))
self.size_edge_k = float(self._get_param("size_edge_k", "2.0"))
# Markout weighting and confidence band
self.markout_decay_alpha = float(self._get_param("markout_decay_alpha", "0.10"))
self.markout_w1 = float(self._get_param("markout_w1", "0.70"))
self.markout_w5 = float(self._get_param("markout_w5", "0.30"))
self.markout_w10 = float(self._get_param("markout_w10", "0.00"))
self.edge_band_sigma_mult = float(self._get_param("edge_band_sigma_mult", "0.20"))
# Protective stop configuration
self.protective_stop_cushion_sigma = float(self._get_param("protective_stop_cushion_sigma", "0.30"))
self.protective_stop_grace_s = int(self._get_param("protective_stop_grace_s", "1"))
# Impulse profit lock (safety-first)
self.impulse_lock_enabled = (self._get_param("impulse_lock_enabled", "true").lower() == "true")
self.impulse_lock_sigma = float(self._get_param("impulse_lock_sigma", "2.0"))
self.impulse_lock_window_s = int(self._get_param("impulse_lock_window_s", "60"))
self.impulse_scaleout_ratio = float(self._get_param("impulse_scaleout_ratio", "0.5"))
self.impulse_trail_sigma = float(self._get_param("impulse_trail_sigma", "0.5"))
# Breakeven + epsilon and adaptive time stop
self.breakeven_epsilon_sigma = float(self._get_param("breakeven_epsilon_sigma", "0.10"))
self.adaptive_time_stop_min_s = int(self._get_param("adaptive_time_stop_min_s", "5"))
self.adaptive_momentum_window_s = int(self._get_param("adaptive_momentum_window_s", "10"))
self.adaptive_momentum_sigma = float(self._get_param("adaptive_momentum_sigma", "0.25"))
# Volatility spike gate
self.alpha_vol_baseline = float(self._get_param("alpha_vol_baseline", "0.02"))
self.vol_spike_mult = float(self._get_param("vol_spike_mult", "4.0"))
self.vol_spike_cooloff_s = int(self._get_param("vol_spike_cooloff_s", "60"))
self.vol_spike_action = self._get_param("vol_spike_action", "pause")
# Additional safety/risk params
self.position_sizing_buffer = float(self._get_param("position_sizing_buffer", "0.7"))
self.max_symbol_exposure_pct = float(self._get_param("max_symbol_exposure_pct", "0.15"))
self.post_liquidation_cooldown_s = int(self._get_param("post_liquidation_cooldown_s", "180"))
self.max_open_orders_per_symbol = int(self._get_param("max_open_orders_per_symbol", "2"))
self.verbose_logs = (self._get_param("verbose_logs", "false").lower() == "true")
self.sigma_floor_dollars = float(self._get_param("sigma_floor_dollars", "5.0"))
self.max_trades_per_minute = int(self._get_param("max_trades_per_minute", "10"))
self.max_qty_per_trade = int(self._get_param("max_qty_per_trade", "25"))
# Reprice throttle
self.reprice_min_s = int(self._get_param("reprice_min_s", "1"))
self.reprice_max_per_order = int(self._get_param("reprice_max_per_order", "3"))
self.pending_entry_replace_count = 0
self.pending_entry_last_replace = None
# Variance-aware gating and per-category pause
self.markout_var_high_k = float(self._get_param("markout_var_high_k", "1.5"))
self.markout_var_strict_ratio = float(self._get_param("markout_var_strict_ratio", "1.20"))
self.min_samples_range = int(self._get_param("min_samples_range", "30"))
self.min_samples_break = int(self._get_param("min_samples_break", "30"))
self.cat_pause_lookback_s = int(self._get_param("cat_pause_lookback_s", "900"))
self.cat_pause_min_trades = int(self._get_param("cat_pause_min_trades", "3"))
self.cat_pause_minutes = int(self._get_param("cat_pause_minutes", "10"))
self.category_paused_until = {"fade_long": None, "fade_short": None, "break_long": None, "break_short": None}
# Breakout quality filters and buffers
self.sweep_window_s = int(self._get_param("sweep_window_s", "1"))
self.sweep_min_trades = int(self._get_param("sweep_min_trades", "6"))
self.sweep_sigma_mult = float(self._get_param("sweep_sigma_mult", "1.0"))
self.no_refill_persist_s = int(self._get_param("no_refill_persist_s", "1"))
# CSV/log cadence
self.csv_flush_minutes = int(self._get_param("csv_flush_minutes", "10"))
self.csv_flush_interval_s = max(60, 60 * self.csv_flush_minutes)
self.skip_log_min_s = int(self._get_param("skip_log_min_s", "30"))
self.last_skip_log_time = None
# ---- Backtest setup ----
try:
y, m, d = [int(x) for x in start.split("-")]
self.set_start_date(y, m, d)
except Exception:
self.set_start_date(2024, 1, 1)
# Optional end date
if end:
try:
y2, m2, d2 = [int(x) for x in end.split("-")]
self.set_end_date(y2, m2, d2)
except Exception:
pass
self.set_cash(cash)
# Brokerage/fee model
try:
# Prefer explicit brokerage param
if hasattr(BrokerageName, self.brokerage_param):
self.set_brokerage_model(getattr(BrokerageName, self.brokerage_param), AccountType.CASH)
elif self.fee_model in ["coinbase", "gdax"]:
try:
self.set_brokerage_model(BrokerageName.COINBASE, AccountType.CASH)
except Exception:
self.set_brokerage_model(BrokerageName.GDAX, AccountType.CASH)
except Exception:
pass
# Data subscription
# Select market (e.g., BINANCE, GDAX, COINBASE, KRAKEN)
try:
market_enum = getattr(Market, self.market_name)
except Exception:
market_enum = Market.BINANCE
self.eth_security = self.add_crypto(self.base_symbol, Resolution.TICK, market_enum)
# Optional zero-fee simulation
try:
if self.fee_model == "zero":
self.eth_security.set_fee_model(ConstantFeeModel(0))
except Exception:
pass
# Optional: conservative slippage modeling (disabled if not available)
# If desired later, use ConstantSlippageModel or a venue-specific model.
self.eth_symbol = self.eth_security.symbol
# Feature state
self.last_bid = None
self.last_ask = None
self.last_bid_size = None
self.last_ask_size = None
self.spread = None
self.mid = None
self.microprice = None
# Order Flow Imbalance (top-of-book approximation)
self.ofi = 0.0
# Cumulative Volume Delta (trade-aggressor signed volume)
self.cvd = 0.0
# Rolling mid history for short-horizon volatility (EWMA-like)
self.mid_window = deque(maxlen=200)
self.vol_ewma = 0.0
# Smoothed flow
self.ofi_ema = 0.0
self.cvd_ema = 0.0
self.last_ofi = 0.0
# Per-second flow persistence (last N seconds of OFI delta sign)
self.ofi_delta_last_seconds = deque(maxlen=10)
# Top-of-book change tracking to proxy cancel vs replenish
self.size_change_window = deque(maxlen=20) # store tuples (dt, dBid, dAsk)
# Markout tracking (expected short-horizon PnL after signal)
self.markout_horizons = [1, 5, 10] # seconds
self.min_samples_for_gating = 50
self.pending_markouts = {
"fade_long": [],
"fade_short": [],
"break_long": [],
"break_short": []
}
self.markout_results = {
cat: {h: deque(maxlen=400) for h in self.markout_horizons}
for cat in self.pending_markouts.keys()
}
# Execution/risk tracking
self.active_trade = None # {direction, entry_time, entry_price, stop_price, time_stop_due, qty, partial_taken, trail_anchor}
self.pending_entry_order_id = None
self.pending_entry_ticket = None
self.pending_entry_expires = None
self.last_trade_time = None
# Broker-side protective stop tracking
self.protective_stop_ticket = None
# Daily loss limit tracking
self.daily_start_equity = None
self.trading_paused = False
self.last_daily_reset = None
self.liquidation_cooldown_until = None
# Timing helpers
self.last_feature_log = None
self.last_flush = None
self.sigma_baseline = 0.0
self.vol_cooloff_until = None
# CSV buffers
self.csv_signals = [] # rows: time,regime,signal,conf,mid,spread,ofi,cvd,micro
self.csv_trades = [] # rows: time,side,price,qty,reason
self.csv_markouts = [] # rows: time,category,h,exp
# Entry fill timestamps for throttle
self.entry_fills_last_minute = deque(maxlen=1000)
# Market data health
self.first_data_time = None
self.seen_any_tick = False
self.seen_quote_tick = False
self.use_trade_mid_fallback = False
self.fallback_after_s = int(self._get_param("fallback_after_s", "120"))
self.debug("Initialized ETH microstructure scaffold")
# Last-signal context (for dynamic sizing)
self.last_signal_conf = 0
self.last_signal_edge = 0.0
self.last_signal_regime = None
self.last_signal_dir = None
# Realized PnL tracking buffers (used by protective stop exit logging)
self.realized_pnl_window = deque(maxlen=2000) # (time, pnl)
self.category_realized = {
"fade_long": deque(maxlen=500),
"fade_short": deque(maxlen=500),
"break_long": deque(maxlen=500),
"break_short": deque(maxlen=500)
}
# Buffers for breakout quality detection
self.trade_window = deque(maxlen=500)
self.ofi_persist_window = deque(maxlen=10)
self.no_refill_long_hist = deque(maxlen=5)
self.no_refill_short_hist = deque(maxlen=5)
self.micro_tilt_hist = deque(maxlen=5)
def on_data(self, data: Slice):
# Process quote ticks to maintain best bid/ask and top-of-book sizes
if self.eth_symbol in data.ticks:
ticks = data.ticks[self.eth_symbol]
if not self.seen_any_tick:
self.first_data_time = self.time
self.seen_any_tick = True
quote_ticks = [t for t in ticks if t.tick_type == TickType.QUOTE]
trade_ticks = [t for t in ticks if t.tick_type == TickType.TRADE]
for qt in quote_ticks:
self._update_top_of_book(qt)
if len(quote_ticks) > 0:
self.seen_quote_tick = True
for tt in trade_ticks:
self._update_trade_flow(tt)
# If no quote ticks but trades exist for a while, synthesize mid/spread from trades
if not self.seen_quote_tick and len(trade_ticks) > 0:
if self.first_data_time is None:
self.first_data_time = self.time
elapsed = (self.time - self.first_data_time).total_seconds()
if elapsed >= self.fallback_after_s:
self.use_trade_mid_fallback = True
self._update_mid_from_trade(trade_ticks[-1])
# Periodically compute features and evaluate signals (e.g., once per second)
if self.last_feature_log is None or (self.time - self.last_feature_log).total_seconds() >= 1:
self._compute_features()
self._update_daily_limits()
self._update_markouts()
self._log_features()
self._evaluate_and_trade()
self._manage_active_trade()
self.last_feature_log = self.time
# One-time warning if no data for a while
try:
if not self.seen_any_tick and self.first_data_time is None:
self.first_data_time = self.time
if (not self.seen_any_tick) and self.first_data_time is not None:
elapsed = (self.time - self.first_data_time).total_seconds()
if elapsed >= max(60, self.fallback_after_s) and getattr(self, '_warned_no_data', False) is not True:
self._warned_no_data = True
self.debug(f"warn: no market data received for {self.base_symbol} on {self.market_name}. Check symbol/market or date range.")
except Exception:
pass
# ---------- Feature maintenance ----------
def _update_top_of_book(self, qt: Tick):
bid = qt.bid_price
ask = qt.ask_price
bid_size = float(qt.bid_size) if qt.bid_size is not None else None
ask_size = float(qt.ask_size) if qt.ask_size is not None else None
if self.last_bid is not None and self.last_ask is not None and bid_size is not None and ask_size is not None and self.last_bid_size is not None and self.last_ask_size is not None:
if bid > self.last_bid:
self.ofi += bid_size
elif bid < self.last_bid:
self.ofi -= self.last_bid_size
else:
self.ofi += (bid_size - self.last_bid_size)
if ask < self.last_ask:
self.ofi += ask_size
elif ask > self.last_ask:
self.ofi -= self.last_ask_size
else:
self.ofi -= (ask_size - self.last_ask_size)
# Track size change window for cancel/replenish proxies
if self.last_bid_size is not None and self.last_ask_size is not None and bid_size is not None and ask_size is not None:
self.size_change_window.append((self.time, bid_size - self.last_bid_size, ask_size - self.last_ask_size))
self.last_bid = bid
self.last_ask = ask
self.last_bid_size = bid_size
self.last_ask_size = ask_size
if bid is not None and ask is not None and bid > 0 and ask > 0:
self.spread = ask - bid
self.mid = 0.5 * (ask + bid)
if bid_size is not None and ask_size is not None and (bid_size + ask_size) > 0:
self.microprice = (ask * bid_size + bid * ask_size) / (bid_size + ask_size)
else:
self.microprice = self.mid
self.mid_window.append(self.mid)
if len(self.mid_window) >= 2:
ret = (self.mid_window[-1] - self.mid_window[-2])
self.vol_ewma = (1 - self.alpha_vol) * self.vol_ewma + self.alpha_vol * (ret * ret)
def _update_trade_flow(self, tt: Tick):
if self.mid is None or tt.value is None or tt.quantity is None:
return
trade_price = tt.value
trade_qty = float(tt.quantity)
if trade_price >= self.mid:
self.cvd += trade_qty
else:
self.cvd -= trade_qty
# Record for sweep-burst detection window
try:
self.trade_window.append((self.time, float(trade_price), float(trade_qty)))
cutoff = self.time - timedelta(seconds=self.sweep_window_s)
while len(self.trade_window) > 0 and self.trade_window[0][0] < cutoff:
self.trade_window.popleft()
except Exception:
pass
def _update_mid_from_trade(self, tt: Tick):
try:
price = float(tt.value)
if price <= 0:
return
# Use symbol's minimum price variation if available to create a synthetic spread
try:
min_tick = 0.01
secs = getattr(self, 'Securities', None)
if secs is not None:
sec = secs.get(self.eth_symbol, None) if hasattr(secs, 'get') else secs[self.eth_symbol] if self.eth_symbol in secs else None
if sec is not None:
props = getattr(sec, 'SymbolProperties', None)
if props is not None:
mt = float(getattr(props, 'MinimumPriceVariation', 0.01))
if mt > 0:
min_tick = mt
except Exception:
min_tick = 0.01
self.last_bid = max(0.0, price - min_tick)
self.last_ask = price + min_tick
self.spread = self.last_ask - self.last_bid
self.mid = price
self.microprice = price
self.mid_window.append(self.mid)
if len(self.mid_window) >= 2:
ret = (self.mid_window[-1] - self.mid_window[-2])
self.vol_ewma = (1 - self.alpha_vol) * self.vol_ewma + self.alpha_vol * (ret * ret)
except Exception:
pass
def _compute_features(self):
# Smooth flows and compute per-second OFI delta sign
self.ofi_ema = (1 - self.alpha_ofi) * self.ofi_ema + self.alpha_ofi * self.ofi
self.cvd_ema = (1 - self.alpha_cvd) * self.cvd_ema + self.alpha_cvd * self.cvd
# OFI delta for persistence window
ofi_delta = self.ofi - self.last_ofi
self.last_ofi = self.ofi
self.ofi_delta_last_seconds.append(ofi_delta)
# Update baseline sigma and detect spikes
try:
sigma_now = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else 0.0
self.sigma_baseline = (1 - self.alpha_vol_baseline) * self.sigma_baseline + self.alpha_vol_baseline * sigma_now
if self.sigma_baseline > 0 and sigma_now > float(self.vol_spike_mult) * self.sigma_baseline:
self.vol_cooloff_until = self.time + timedelta(seconds=self.vol_spike_cooloff_s)
except Exception:
pass
self.ofi_persist_window.append(ofi_delta)
# Compute replenish/cancel proxies over recent window
# Positive bid delta implies replenish; negative implies cancel. Opposite for ask side.
# Aggregate over last ~3 seconds worth of events
cutoff = self.time - timedelta(seconds=3)
bid_replenish = 0.0
ask_replenish = 0.0
for ts, d_bid, d_ask in list(self.size_change_window):
if ts < cutoff:
continue
if d_bid > 0:
bid_replenish += d_bid
if d_ask > 0:
ask_replenish += d_ask
self.bid_replenish_score = bid_replenish
self.ask_replenish_score = ask_replenish
# Persist booleans and microprice tilt for quality filters
try:
self.no_refill_long_hist.append(self.ask_replenish_score <= 0)
self.no_refill_short_hist.append(self.bid_replenish_score <= 0)
tilt = 1 if (self.microprice - self.mid) > 0 else (-1 if (self.microprice - self.mid) < 0 else 0)
self.micro_tilt_hist.append(tilt)
except Exception:
pass
def _log_features(self):
if not getattr(self, 'verbose_logs', False):
return
if self.mid is None or self.spread is None or self.microprice is None:
return
self.debug(f"t={self.time} mid={self.mid:.2f} spr={self.spread:.2f} micro={self.microprice:.2f} ofi={self.ofi:.2f} cvd={self.cvd:.2f} volEWMA={self.vol_ewma:.6f}")
# ---------- Markouts ----------
def _register_signal_markout(self, category: str):
if self.mid is None:
return
anchor_price = self.mid
now = self.time
entry = {
"t0": now,
"p0": anchor_price,
"due_times": {h: now + timedelta(seconds=h) for h in self.markout_horizons}
}
self.pending_markouts[category].append(entry)
def _update_markouts(self):
if self.mid is None:
return
now = self.time
for category, entries in self.pending_markouts.items():
i = 0
while i < len(entries):
entry = entries[i]
matured = []
for h, due in entry["due_times"].items():
if now >= due:
matured.append(h)
for h in matured:
pnl = self.mid - entry["p0"]
self.markout_results[category][h].append(pnl)
# buffer markout observation
try:
self.csv_markouts.append([str(now), category, h, pnl])
except Exception:
pass
del entry["due_times"][h]
if not entry["due_times"]:
entries.pop(i)
else:
i += 1
def _expected_markout(self, category: str, horizon: int) -> float:
arr = self.markout_results[category][horizon]
if len(arr) == 0:
return 0.0
if self.markout_decay_alpha <= 0.0:
return sum(arr) / float(len(arr))
# Exponentially decay-weighted mean (recent observations count more)
alpha = max(0.0, min(1.0, self.markout_decay_alpha))
ema = 0.0
initialized = False
for v in reversed(arr):
if not initialized:
ema = float(v)
initialized = True
else:
ema = (1 - alpha) * ema + alpha * float(v)
return ema
def _estimate_trade_cost(self, direction: str, mode: str) -> float:
if self.mid is None:
return 0.0
half_spread = (self.spread * 0.5) if self.spread is not None else 0.0
fee_rate = self.maker_fee_rate if (mode == "fade" and self.use_maker_for_fades) else self.taker_fee_rate
fee_cost = fee_rate * self.mid
return half_spread + fee_cost
def _passes_markout_gating(self, regime: str, direction: str) -> bool:
if regime == "range":
category = "fade_long" if direction == "long" else "fade_short"
mode = "fade"
else:
category = "break_long" if direction == "long" else "break_short"
mode = "break"
total_samples = sum(len(self.markout_results[category][h]) for h in self.markout_horizons)
min_needed = self.min_samples_range if regime == "range" else self.min_samples_break
if total_samples < min_needed:
return True
sign = 1.0 if direction == "long" else -1.0
exp1 = sign * self._expected_markout(category, 1)
exp5 = sign * self._expected_markout(category, 5)
exp10 = sign * self._expected_markout(category, 10)
denom = max(1e-9, (self.markout_w1 + self.markout_w5 + self.markout_w10))
exp = (self.markout_w1 * exp1 + self.markout_w5 * exp5 + self.markout_w10 * exp10) / denom
cost = self._estimate_trade_cost(direction, mode)
# Confidence band around zero: require exp to beat cost by band
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
band = float(self.edge_band_sigma_mult) * (sigma_price if sigma_price is not None else 0.0)
if not (exp > (cost + band)):
return False
# Variance-aware strictness
try:
vals = []
for h in [1, 5]:
arr = list(self.markout_results[category][h])
if len(arr) > 1:
m = sum(arr) / float(len(arr))
var = sum((x - m) * (x - m) for x in arr) / float(len(arr) - 1)
vals.append(var)
var_w = sum(vals) / float(len(vals)) if len(vals) > 0 else 0.0
sigma_norm = (sigma_price if sigma_price is not None and sigma_price > 0 else 1.0)
var_norm = math.sqrt(max(0.0, var_w)) / sigma_norm
if var_norm > float(self.markout_var_high_k):
ratio = (exp / max(1e-9, cost)) if cost > 0 else 999.0
if ratio < float(self.markout_var_strict_ratio):
return False
except Exception:
pass
# Per-category pause
try:
if self._category_paused(category):
return False
except Exception:
pass
return True
def _category_paused(self, category: str) -> bool:
try:
until = self.category_paused_until.get(category)
if until is not None and self.time < until:
return True
cutoff = self.time - timedelta(seconds=self.cat_pause_lookback_s)
dq = self.category_realized.get(category, deque())
recent = [p for (ts, p) in dq if ts >= cutoff]
if len(recent) >= self.cat_pause_min_trades and sum(recent) <= 0.0:
self.category_paused_until[category] = self.time + timedelta(minutes=self.cat_pause_minutes)
if self.verbose_logs:
self.debug(f"pause: category {category} due to recent PnL <= 0")
return True
except Exception:
pass
return False
# ---------- Signal + execution ----------
def _evaluate_and_trade(self):
regime = self._classify_regime()
# Volatility spike cooloff gate (pause only; widening optional in future)
try:
if self.vol_cooloff_until is not None and self.time < self.vol_cooloff_until and self.vol_spike_action == "pause":
if self.verbose_logs:
self.debug("skip: vol spike cooloff active")
return
except Exception:
pass
if regime == "range":
signal = self._absorption_fade_signal()
category_long = "fade_long"
category_short = "fade_short"
else:
signal = self._sweep_breakout_signal()
category_long = "break_long"
category_short = "break_short"
if signal is None:
if self.verbose_logs:
try:
self.debug(f"skip: no signal (regime={regime})")
except Exception:
pass
return
if signal == "long":
self._register_signal_markout(category_long)
elif signal == "short":
self._register_signal_markout(category_short)
# Store latest signal context for sizing
try:
self.last_signal_regime = regime
self.last_signal_dir = signal
except Exception:
pass
if not self._passes_markout_gating(regime, signal):
if self.verbose_logs:
try:
mode = "fade" if regime == "range" else "break"
cost = self._estimate_trade_cost(signal, mode)
if regime == "range":
cat = "fade_long" if signal == "long" else "fade_short"
else:
cat = "break_long" if signal == "long" else "break_short"
sgn = 1 if signal == "long" else -1
e1 = sgn * self._expected_markout(cat, 1)
e5 = sgn * self._expected_markout(cat, 5)
e10 = sgn * self._expected_markout(cat, 10)
denom = max(1e-9, (self.markout_w1 + self.markout_w5 + self.markout_w10))
exp = (self.markout_w1 * e1 + self.markout_w5 * e5 + self.markout_w10 * e10) / denom
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
band = float(self.edge_band_sigma_mult) * (sigma_price if sigma_price is not None else 0.0)
thr = cost + band
# Rate-limit skip logs
if self.last_skip_log_time is None or (self.time - self.last_skip_log_time).total_seconds() >= self.skip_log_min_s:
self.debug(f"skip: markout gating exp={exp:.4f} <= cost+band={thr:.4f} (regime={regime}, dir={signal})")
self.last_skip_log_time = self.time
except Exception:
pass
return
else:
# Capture edge estimate for dynamic sizing
try:
mode = "fade" if regime == "range" else "break"
if regime == "range":
cat = "fade_long" if signal == "long" else "fade_short"
else:
cat = "break_long" if signal == "long" else "break_short"
sgn = 1 if signal == "long" else -1
e1 = sgn * self._expected_markout(cat, 1)
e5 = sgn * self._expected_markout(cat, 5)
e10 = sgn * self._expected_markout(cat, 10)
denom = max(1e-9, (self.markout_w1 + self.markout_w5 + self.markout_w10))
edge_signed = (self.markout_w1 * e1 + self.markout_w5 * e5 + self.markout_w10 * e10) / denom
edge = max(0.0, edge_signed - self._estimate_trade_cost(signal, mode))
self.last_signal_edge = float(edge)
except Exception:
self.last_signal_edge = 0.0
# Safety gates
if self.spread is not None and self.spread > self.max_spread_allowed:
if self.verbose_logs:
try:
self.debug(f"skip: spread {self.spread:.2f} > max {self.max_spread_allowed:.2f}")
except Exception:
pass
return
if self.trading_paused:
if self.verbose_logs:
try:
self.debug("skip: daily loss limit pause active")
except Exception:
pass
return
# Cooldown after forced liquidation or margin warning
if self._in_liquidation_cooldown():
if self.verbose_logs:
try:
left = (self.liquidation_cooldown_until - self.time).total_seconds() if self.liquidation_cooldown_until else 0
self.debug(f"skip: liquidation cooldown {max(0,int(left))}s left")
except Exception:
pass
return
if self.last_trade_time is not None and (self.time - self.last_trade_time).total_seconds() < self.trade_cooldown_s:
if self.verbose_logs:
try:
waited = (self.time - self.last_trade_time).total_seconds()
self.debug(f"skip: cooldown {waited:.0f}s/{self.trade_cooldown_s}s")
except Exception:
pass
return
# Per-minute entry throttle (use filled entries record)
if self._recent_entry_count(60) >= int(self.max_trades_per_minute):
if self.verbose_logs:
try:
self.debug("skip: trade throttle (per-minute cap)")
except Exception:
pass
return
# Positions cap (future-proof for multi-asset)
try:
if self._num_open_positions() >= self.max_positions and not self.portfolio[self.eth_symbol].invested:
if self.verbose_logs:
self.debug(f"skip: max positions cap {self.max_positions}")
return
except Exception:
pass
# Limit open orders per symbol
try:
if self._too_many_open_orders():
if self.verbose_logs:
self.debug("skip: open order cap reached")
return
except Exception:
pass
# Cap per-symbol exposure by value
try:
if self._symbol_exposure_ratio() >= float(self.max_symbol_exposure_pct):
if self.verbose_logs:
self.debug("skip: symbol exposure cap")
return
except Exception:
pass
if regime == "range":
if signal == "long":
self._enter_long_passive()
elif signal == "short":
self._enter_short_passive()
else:
if signal == "long":
self._enter_long_breakout()
elif signal == "short":
self._enter_short_breakout()
# manage expiry of any still-open entry
self._expire_pending_entry()
def _classify_regime(self) -> str:
if self.spread is None or self.vol_ewma is None:
return "range"
# Persistence of OFI sign across last seconds
pos = sum(1 for x in self.ofi_delta_last_seconds if x > 0)
neg = sum(1 for x in self.ofi_delta_last_seconds if x < 0)
ofi_persistence = abs(pos - neg)
# Compare volatility to spread; higher multiple indicates breakout regime
if self.vol_ewma > 2.0 * (self.spread * self.spread) and ofi_persistence >= 6:
return "breakout"
return "range"
def _absorption_fade_signal(self):
if self.mid is None or self.microprice is None or self.spread is None:
return None
# Build confirmations (need 2 of 3)
micro_tilt_up = (self.microprice - self.mid) >= self.micro_tilt_threshold
micro_tilt_dn = (self.mid - self.microprice) >= self.micro_tilt_threshold
sell_flow_no_progress = (self.cvd_ema < 0 and abs(self.mid_window[-1] - self.mid_window[0]) < max(1.0, self.spread)) if len(self.mid_window) >= 2 else (self.cvd_ema < 0)
buy_flow_no_progress = (self.cvd_ema > 0 and abs(self.mid_window[-1] - self.mid_window[0]) < max(1.0, self.spread)) if len(self.mid_window) >= 2 else (self.cvd_ema > 0)
bid_replenish = (self.bid_replenish_score > 0)
ask_replenish = (self.ask_replenish_score > 0)
conf_long = sum([sell_flow_no_progress, bid_replenish, micro_tilt_up])
conf_short = sum([buy_flow_no_progress, ask_replenish, micro_tilt_dn])
if self.spread <= self.max_spread_allowed and conf_long >= 2:
self._buffer_signal("range", "long", conf_long)
self.last_signal_conf = int(conf_long)
return "long"
if self.spread <= self.max_spread_allowed and conf_short >= 2:
self._buffer_signal("range", "short", conf_short)
self.last_signal_conf = int(conf_short)
return "short"
return None
def _sweep_breakout_signal(self):
if self.mid is None or self.microprice is None:
return None
# Approximations without full L2, enhanced with sweep-burst and persistence
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
# Sweep burst
sweep_ok = False
try:
if len(self.trade_window) >= self.sweep_min_trades and sigma_price is not None and sigma_price > 0:
p0 = self.trade_window[0][1]; p1 = self.trade_window[-1][1]
if abs(p1 - p0) >= float(self.sweep_sigma_mult) * sigma_price:
sweep_ok = True
except Exception:
sweep_ok = False
big_ofi = abs(self.ofi_persist_window[-1]) if len(self.ofi_persist_window) > 0 else 0.0
ofi_unidirectional = sum(1 for x in self.ofi_persist_window if x > 0) >= 7 or sum(1 for x in self.ofi_persist_window if x < 0) >= 7
# No-refill persistence over last no_refill_persist_s seconds
no_refill_long = (self.ask_replenish_score <= 0) and (sum(1 for b in list(self.no_refill_long_hist)[-self.no_refill_persist_s:] if b) == self.no_refill_persist_s)
no_refill_short = (self.bid_replenish_score <= 0) and (sum(1 for b in list(self.no_refill_short_hist)[-self.no_refill_persist_s:] if b) == self.no_refill_persist_s)
# Micro anti-lean: avoid opposite tilt in recent seconds
micro_lean_up = (self.microprice > self.mid) and (min(list(self.micro_tilt_hist)[-self.no_refill_persist_s:] or [1]) >= 0)
micro_lean_dn = (self.microprice < self.mid) and (max(list(self.micro_tilt_hist)[-self.no_refill_persist_s:] or [-1]) <= 0)
conf_long = sum([ofi_unidirectional and big_ofi > 0, no_refill_long, micro_lean_up, sweep_ok])
conf_short = sum([ofi_unidirectional and big_ofi > 0, no_refill_short, micro_lean_dn, sweep_ok])
if conf_long >= 2:
self._buffer_signal("breakout", "long", conf_long)
self.last_signal_conf = int(conf_long)
return "long"
if conf_short >= 2:
self._buffer_signal("breakout", "short", conf_short)
self.last_signal_conf = int(conf_short)
return "short"
return None
# ---------- Execution + risk helpers ----------
def _enter_long_passive(self):
if self.portfolio[self.eth_symbol].invested and self.portfolio[self.eth_symbol].quantity > 0:
return
qty = self._position_size(direction=1)
qty = self._apply_sizing_guards(qty)
if qty <= 0:
return
limit_price = self.last_bid if self.last_bid is not None else None
if limit_price is None:
return
if self.trading_paused:
return
# Heat gating using fade stop distance approximation
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
stop_distance = self.stop_vol_k * sigma_price
if self._would_exceed_heat(qty, stop_distance):
if self.verbose_logs:
try:
self.debug("skip: portfolio heat cap")
except Exception:
pass
return
if self._has_open_entry():
self._reprice_entry_if_needed("long", limit_price)
return
ticket = self.limit_order(self.eth_symbol, qty, limit_price)
if ticket is not None and ticket.order_id > 0:
self.pending_entry_order_id = ticket.order_id
self.pending_entry_ticket = ticket
self.pending_entry_expires = self.time + timedelta(seconds=3)
self.pending_entry_replace_count = 0
self.pending_entry_last_replace = self.time
try:
self.debug(f"entry: passive long qty={qty} px={limit_price:.2f}")
except Exception:
pass
def _enter_short_passive(self):
if self.portfolio[self.eth_symbol].invested and self.portfolio[self.eth_symbol].quantity < 0:
return
qty = self._position_size(direction=-1)
qty = self._apply_sizing_guards(qty)
if qty >= 0:
return
limit_price = self.last_ask if self.last_ask is not None else None
if limit_price is None:
return
if self.trading_paused:
return
# Heat gating using fade stop distance approximation
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
stop_distance = self.stop_vol_k * sigma_price
if self._would_exceed_heat(qty, stop_distance):
if self.verbose_logs:
try:
self.debug("skip: portfolio heat cap")
except Exception:
pass
return
if self._has_open_entry():
self._reprice_entry_if_needed("short", limit_price)
return
ticket = self.limit_order(self.eth_symbol, qty, limit_price)
if ticket is not None and ticket.order_id > 0:
self.pending_entry_order_id = ticket.order_id
self.pending_entry_ticket = ticket
self.pending_entry_expires = self.time + timedelta(seconds=3)
self.pending_entry_replace_count = 0
self.pending_entry_last_replace = self.time
try:
self.debug(f"entry: passive short qty={qty} px={limit_price:.2f}")
except Exception:
pass
def _enter_long_breakout(self):
if self.portfolio[self.eth_symbol].invested and self.portfolio[self.eth_symbol].quantity > 0:
return
qty = self._position_size(direction=1)
qty = self._apply_sizing_guards(qty)
if qty <= 0 or self.mid is None:
return
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
stop_price = self.mid + self.breakout_sigma_band * sigma_price
limit_price = stop_price + self.breakout_limit_extra_sigma * sigma_price
if self.trading_paused:
return
if self._would_exceed_heat(qty, self.breakout_sigma_band * sigma_price):
try:
self.debug("skip: portfolio heat cap")
except Exception:
pass
return
if self._has_open_entry():
# If we already have a pending breakout order, leave it or cancel/replace if far
self._reprice_entry_if_needed("long_break", limit_price, stop_price)
return
ticket = self.stop_limit_order(self.eth_symbol, qty, stop_price, limit_price)
if ticket is not None and ticket.order_id > 0:
self.pending_entry_order_id = ticket.order_id
self.pending_entry_ticket = ticket
self.pending_entry_expires = self.time + timedelta(seconds=5)
self.pending_entry_replace_count = 0
self.pending_entry_last_replace = self.time
try:
self.debug(f"entry: breakout long qty={qty} stop={stop_price:.2f} limit={limit_price:.2f}")
except Exception:
pass
def _enter_short_breakout(self):
if self.portfolio[self.eth_symbol].invested and self.portfolio[self.eth_symbol].quantity < 0:
return
qty = self._position_size(direction=-1)
qty = self._apply_sizing_guards(qty)
if qty >= 0 or self.mid is None:
return
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
stop_price = self.mid - self.breakout_sigma_band * sigma_price
limit_price = stop_price - self.breakout_limit_extra_sigma * sigma_price
if self.trading_paused:
return
if self._would_exceed_heat(qty, self.breakout_sigma_band * sigma_price):
try:
self.debug("skip: portfolio heat cap")
except Exception:
pass
return
if self._has_open_entry():
self._reprice_entry_if_needed("short_break", limit_price, stop_price)
return
ticket = self.stop_limit_order(self.eth_symbol, qty, stop_price, limit_price)
if ticket is not None and ticket.order_id > 0:
self.pending_entry_order_id = ticket.order_id
self.pending_entry_ticket = ticket
self.pending_entry_expires = self.time + timedelta(seconds=5)
self.pending_entry_replace_count = 0
self.pending_entry_last_replace = self.time
try:
self.debug(f"entry: breakout short qty={qty} stop={stop_price:.2f} limit={limit_price:.2f}")
except Exception:
pass
def _init_risk_tracker(self, direction: int, entry_price: float, qty: int):
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
stop_distance = self.stop_vol_k * sigma_price
stop_price = entry_price - stop_distance if direction > 0 else entry_price + stop_distance
self.active_trade = {
"direction": direction,
"entry_time": self.time,
"entry_price": entry_price,
"stop_price": stop_price,
"time_stop_due": self.time + timedelta(seconds=self.time_stop_seconds),
"qty": qty,
"partial_taken": False,
"trail_anchor": entry_price,
"protect_arm_time": self.time + timedelta(seconds=self.protective_stop_grace_s)
}
self.last_trade_time = self.time
try:
side = "long" if direction > 0 else "short"
self.csv_trades.append([str(self.time), side, entry_price, qty, "entry"])
except Exception:
pass
# Place broker-side protective stop
try:
self._place_protective_stop()
except Exception:
pass
def _manage_active_trade(self):
if not self.active_trade or self.mid is None:
return
direction = self.active_trade["direction"]
stop_price = self.active_trade["stop_price"]
qty = self.active_trade["qty"]
entry_price = self.active_trade["entry_price"]
# Time stop
if self.time >= self.active_trade["time_stop_due"]:
self.liquidate(self.eth_symbol)
self.active_trade = None
return
# Take-profit partial
if not self.active_trade["partial_taken"]:
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
tp_distance = self.partial_tp_target_sigma * sigma_price
target = entry_price + tp_distance if direction > 0 else entry_price - tp_distance
if (direction > 0 and self.mid >= target) or (direction < 0 and self.mid <= target):
tp_qty = int(max(1, abs(qty) * self.partial_tp_ratio)) * (1 if direction > 0 else -1)
self.market_order(self.eth_symbol, -tp_qty)
self.active_trade["partial_taken"] = True
self.active_trade["trail_anchor"] = self.mid
# Move stop to breakeven + epsilon and sync protective stop
try:
eps = float(self.breakeven_epsilon_sigma) * sigma_price
if direction > 0:
new_stop = max(self.active_trade["stop_price"], entry_price + eps)
else:
new_stop = min(self.active_trade["stop_price"], entry_price - eps)
if (direction > 0 and new_stop > self.active_trade["stop_price"]) or (direction < 0 and new_stop < self.active_trade["stop_price"]):
self.active_trade["stop_price"] = new_stop
self._update_protective_stop_price(new_stop)
except Exception:
pass
try:
# Re-sync protective stop quantity after partial
self._place_protective_stop()
except Exception:
pass
# Microprice-based trailing stop
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
trail_step = self.trail_sigma * sigma_price
if direction > 0:
if self.mid > self.active_trade["trail_anchor"]:
self.active_trade["trail_anchor"] = self.mid
new_stop = self.active_trade["trail_anchor"] - trail_step
if new_stop > stop_price:
self.active_trade["stop_price"] = new_stop
try:
self._update_protective_stop_price(new_stop)
except Exception:
pass
else:
if self.mid < self.active_trade["trail_anchor"]:
self.active_trade["trail_anchor"] = self.mid
new_stop = self.active_trade["trail_anchor"] + trail_step
if new_stop < stop_price:
self.active_trade["stop_price"] = new_stop
try:
self._update_protective_stop_price(new_stop)
except Exception:
pass
# Impulse profit lock: only tightens risk or reduces size (never increases)
if self.impulse_lock_enabled:
try:
elapsed_s = (self.time - self.active_trade["entry_time"]).total_seconds()
if elapsed_s <= float(self.impulse_lock_window_s):
favorable = (self.mid - entry_price) * (1 if direction > 0 else -1)
if favorable is not None and sigma_price > 0 and favorable >= float(self.impulse_lock_sigma) * sigma_price:
# Scale out a portion safely
scale_qty = int(max(1, abs(self.active_trade["qty"]) * float(self.impulse_scaleout_ratio)))
scale_qty *= (1 if direction > 0 else -1)
self.market_order(self.eth_symbol, -scale_qty)
# Update tracked qty to current holdings
try:
self.active_trade["qty"] = int(self.portfolio[self.eth_symbol].quantity)
except Exception:
pass
# Tighten stop to breakeven + small trail proportion of sigma
lock_step = max(0.0, float(self.impulse_trail_sigma) * sigma_price)
if direction > 0:
new_lock = max(self.active_trade["stop_price"], entry_price + lock_step)
if new_lock > self.active_trade["stop_price"]:
self.active_trade["stop_price"] = new_lock
self._update_protective_stop_price(new_lock)
else:
new_lock = min(self.active_trade["stop_price"], entry_price - lock_step)
if new_lock < self.active_trade["stop_price"]:
self.active_trade["stop_price"] = new_lock
self._update_protective_stop_price(new_lock)
# Ensure we don't run this multiple times
self.impulse_lock_enabled = False
except Exception:
pass
# Price-based hard stop
stop_price = self.active_trade["stop_price"]
if direction > 0 and self.mid <= stop_price:
self.liquidate(self.eth_symbol)
self.active_trade = None
try:
self._cancel_protective_stop()
except Exception:
pass
return
if direction < 0 and self.mid >= stop_price:
self.liquidate(self.eth_symbol)
self.active_trade = None
try:
self._cancel_protective_stop()
except Exception:
pass
return
# Adaptive time stop: if momentum is weak early, shorten time stop
try:
elapsed = (self.time - self.active_trade["entry_time"]).total_seconds()
win = float(self.adaptive_momentum_window_s)
if elapsed >= win:
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
favorable = (self.mid - entry_price) * (1 if direction > 0 else -1)
if sigma_price is None:
sigma_price = 0.0
if favorable < float(self.adaptive_momentum_sigma) * sigma_price:
min_due = self.active_trade["entry_time"] + timedelta(seconds=int(self.adaptive_time_stop_min_s))
if self.active_trade["time_stop_due"] > min_due:
self.active_trade["time_stop_due"] = min_due
except Exception:
pass
# Periodically flush CSV buffers
if self.last_flush is None or (self.time - self.last_flush).total_seconds() >= self.csv_flush_interval_s:
self._flush_csv()
self.last_flush = self.time
def on_end_of_algorithm(self):
try:
self._flush_csv()
except Exception:
pass
# ---------- Protective stop helpers ----------
def _place_protective_stop(self):
try:
if not self.active_trade:
return
# honor grace period after entry
if self.active_trade.get("protect_arm_time") is not None and self.time < self.active_trade["protect_arm_time"]:
return
qty_now = int(self.portfolio[self.eth_symbol].quantity)
if qty_now == 0:
return
stop_price = float(self.active_trade.get("stop_price", 0.0))
# Apply fail-safe cushion to reduce premature triggers
try:
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 0.0)
cushion = float(self.protective_stop_cushion_sigma) * (sigma_price if sigma_price is not None else 0.0)
if qty_now > 0:
stop_price = max(0.0, stop_price - cushion)
elif qty_now < 0:
stop_price = stop_price + cushion
except Exception:
pass
if stop_price <= 0.0:
return
desired_qty = -qty_now # opposite side to close position fully
# If a stop already exists, cancel and replace to ensure correct quantity
if self.protective_stop_ticket is not None:
try:
self.protective_stop_ticket.cancel()
except Exception:
pass
self.protective_stop_ticket = None
ticket = self.stop_market_order(self.eth_symbol, desired_qty, stop_price)
if ticket is not None and ticket.order_id > 0:
self.protective_stop_ticket = ticket
except Exception:
pass
def _update_protective_stop_price(self, new_stop_price: float):
try:
if self.protective_stop_ticket is None or new_stop_price is None or new_stop_price <= 0.0:
return
# Keep cushion relative to internal stop when updating
try:
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 0.0)
cushion = float(self.protective_stop_cushion_sigma) * (sigma_price if sigma_price is not None else 0.0)
qty_now = int(self.portfolio[self.eth_symbol].quantity)
if qty_now > 0:
new_stop_price = max(0.0, new_stop_price - cushion)
elif qty_now < 0:
new_stop_price = new_stop_price + cushion
except Exception:
pass
fields = UpdateOrderFields()
fields.stop_price = float(new_stop_price)
try:
# Some QC builds use PascalCase
setattr(fields, 'StopPrice', float(new_stop_price))
except Exception:
pass
self.protective_stop_ticket.update(fields)
except Exception:
pass
def _cancel_protective_stop(self):
try:
if self.protective_stop_ticket is not None:
self.protective_stop_ticket.cancel()
except Exception:
pass
self.protective_stop_ticket = None
# ---------- Entry order management ----------
def _has_open_entry(self) -> bool:
try:
# Prefer our stored ticket
if self.pending_entry_ticket is not None:
ord_status = self.pending_entry_ticket.status
if ord_status in [OrderStatus.NEW, OrderStatus.SUBMITTED, OrderStatus.PARTIALLY_FILLED]:
return True
# Fallback: check open orders for the symbol
tx = getattr(self, 'Transactions', None)
if tx is None:
return False
open_orders = tx.GetOpenOrders(self.eth_symbol)
return len(open_orders) > 0
except Exception:
return False
def _cancel_pending_entry(self):
try:
if self.pending_entry_ticket is not None:
self.pending_entry_ticket.cancel()
except Exception:
try:
if self.pending_entry_order_id is not None:
self.Transactions.CancelOrder(self.pending_entry_order_id)
except Exception:
pass
self.pending_entry_ticket = None
self.pending_entry_order_id = None
self.pending_entry_expires = None
def _expire_pending_entry(self):
try:
if self.pending_entry_expires is not None and self.time >= self.pending_entry_expires and self._has_open_entry():
self.debug("expire: cancel stale entry order")
self._cancel_pending_entry()
except Exception:
pass
def _reprice_entry_if_needed(self, side: str, desired_limit: float, desired_stop: float = None):
try:
if not self._has_open_entry():
return
# Determine current price on ticket if we can
current_limit = None
current_stop = None
if self.pending_entry_ticket is not None:
try:
res_limit = self.pending_entry_ticket.Get(OrderField.LIMIT_PRICE)
if getattr(res_limit, 'IsSuccess', False) and getattr(res_limit, 'Value', None) is not None:
current_limit = float(res_limit.Value)
except Exception:
pass
try:
res_stop = self.pending_entry_ticket.Get(OrderField.STOP_PRICE)
if getattr(res_stop, 'IsSuccess', False) and getattr(res_stop, 'Value', None) is not None:
current_stop = float(res_stop.Value)
except Exception:
pass
# If price already aligned closely, do nothing
if current_limit is not None and abs(current_limit - desired_limit) <= (self.spread or 1.0) * 0.1:
return
# Reprice throttle and max replaces
try:
if self.pending_entry_last_replace is not None:
if (self.time - self.pending_entry_last_replace).total_seconds() < float(self.reprice_min_s):
return
if self.pending_entry_replace_count is not None and self.pending_entry_replace_count >= int(self.reprice_max_per_order):
return
except Exception:
pass
# Otherwise cancel and allow next cycle to place a new aligned order
self._cancel_pending_entry()
try:
self.pending_entry_replace_count = (self.pending_entry_replace_count or 0) + 1
self.pending_entry_last_replace = self.time
except Exception:
pass
try:
self.debug("reprice: canceled pending entry; will re-place next cycle")
except Exception:
pass
return
except Exception:
pass
def _position_size(self, direction: int) -> int:
# Use QC API casing and coerce to float to avoid comparison issues
try:
pf = getattr(self, 'Portfolio', None)
pv = float(pf.TotalPortfolioValue) if pf is not None else 0.0
except Exception:
pv = float(getattr(getattr(self, 'portfolio', {}), 'total_portfolio_value', 0.0))
price = float(self.mid) if self.mid is not None else None
if price is None or price <= 0.0:
return 0
# Value cap sizing
max_value = float(self.max_position_value) * pv
value_qty_cap = int(max_value / price) if price > 0 else 0
# Risk-based sizing using σ-scaled stop distance
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
sigma_for_sizing = max(float(self.sigma_floor_dollars), sigma_price if sigma_price is not None else 0.0)
stop_distance = max(0.0, self.stop_vol_k * sigma_for_sizing)
if stop_distance > 0.0:
risk_cap_value = float(self.risk_per_trade_pct) * pv
risk_qty_cap = int(risk_cap_value / stop_distance)
else:
risk_qty_cap = value_qty_cap
qty_unsigned = max(0, min(value_qty_cap, risk_qty_cap))
# Dynamic multipliers: confirmations and edge
try:
conf = int(self.last_signal_conf) if self.last_signal_conf is not None else 0
conf_adj = max(0.0, 1.0 + (conf - 2) * float(self.size_mult_per_confirmation))
except Exception:
conf_adj = 1.0
try:
sigma_unit = sigma_price if sigma_price > 0 else 1.0
edge_units = float(self.last_signal_edge) / sigma_unit
edge_adj = 1.0 + float(self.size_edge_k) * max(0.0, min(1.0, edge_units))
except Exception:
edge_adj = 1.0
qty_unsigned = int(qty_unsigned * conf_adj * edge_adj)
# Hard clamp by configured max quantity per trade
try:
max_qty = int(self.max_qty_per_trade)
if max_qty > 0:
qty_unsigned = min(qty_unsigned, max_qty)
except Exception:
pass
# Enforce min notional
if price * qty_unsigned < float(self.min_trade_notional):
return 0
return qty_unsigned * (1 if direction > 0 else -1)
# ---------- Portfolio risk helpers ----------
def _update_daily_limits(self):
try:
today = self.time.date()
if self.last_daily_reset is None or self.last_daily_reset != today:
self.daily_start_equity = float(self.Portfolio.TotalPortfolioValue)
self.trading_paused = False
self.last_daily_reset = today
pf = getattr(self, 'Portfolio', None)
pv = float(pf.TotalPortfolioValue) if pf is not None else 0.0
if self.daily_start_equity is not None and not self.trading_paused:
if pv <= (1.0 - float(self.daily_loss_limit_pct)) * self.daily_start_equity:
self.trading_paused = True
try:
self.debug("pause: daily loss limit hit")
except Exception:
pass
except Exception:
pass
def _current_portfolio_heat_ratio(self) -> float:
try:
if not self.active_trade or self.mid is None:
return 0.0
qty = abs(self.active_trade.get("qty", 0))
stop_price = self.active_trade.get("stop_price", None)
ref = self.mid if self.mid is not None else self.active_trade.get("entry_price", 0.0)
if stop_price is None:
return 0.0
risk_per_unit = abs(ref - float(stop_price))
pf = getattr(self, 'Portfolio', None)
pv = float(pf.TotalPortfolioValue) if pf is not None else 0.0
if pv <= 0.0:
return 0.0
return (risk_per_unit * qty) / pv
except Exception:
return 0.0
def _would_exceed_heat(self, add_qty: int, stop_distance: float) -> bool:
try:
pv = float(self.Portfolio.TotalPortfolioValue)
if pv <= 0.0:
return True
proposed = (abs(add_qty) * max(0.0, float(stop_distance))) / pv
return (self._current_portfolio_heat_ratio() + proposed) > float(self.max_portfolio_heat_pct)
except Exception:
return False
# ---------- Additional safety helpers ----------
def _symbol_exposure_ratio(self) -> float:
try:
pv = float(self.Portfolio.TotalPortfolioValue)
if pv <= 0.0:
return 0.0
hv = 0.0
try:
hv = float(self.Securities[self.eth_symbol].Holdings.HoldingsValue)
except Exception:
try:
hv = float(self.portfolio[self.eth_symbol].holdings_value)
except Exception:
hv = 0.0
return abs(hv) / pv
except Exception:
return 0.0
def _available_cash(self) -> float:
try:
pf = getattr(self, 'Portfolio', None)
return float(pf.Cash) if pf is not None else 0.0
except Exception:
try:
return float(self.portfolio.cash)
except Exception:
return 0.0
def _apply_sizing_guards(self, qty: int) -> int:
try:
if qty == 0 or self.mid is None or self.mid <= 0.0:
return 0
# Conservative buffer
qty = int(qty * float(self.position_sizing_buffer))
if qty == 0:
return 0
# Clamp to per-symbol exposure cap
pv = float(self.Portfolio.TotalPortfolioValue)
max_value = float(self.max_symbol_exposure_pct) * pv
max_qty_exposure = int(max_value / float(self.mid)) if self.mid > 0 else 0
if max_qty_exposure > 0:
qty = max(-max_qty_exposure, min(max_qty_exposure, qty))
# Clamp to available cash for longs in cash accounts
# This avoids over-sizing when fees/volatility push notional above cash
cash = self._available_cash()
if qty > 0 and cash > 0:
fee_rate = float(self.taker_fee_rate)
cost_per_unit = float(self.mid) * (1.0 + fee_rate)
max_qty_cash = int(cash / cost_per_unit)
if max_qty_cash >= 0:
qty = min(qty, max_qty_cash)
# Final cap using QC buying power-aware target sizing
try:
pv2 = float(self.Portfolio.TotalPortfolioValue)
if pv2 > 0 and self.mid > 0:
target = float(qty * self.mid) / pv2
qc_qty = self._calc_order_qty_for_target(self.eth_symbol, target)
if qc_qty is not None and qc_qty != 0 and (qc_qty > 0) == (qty > 0):
sgn = 1 if qty > 0 else -1
qty = sgn * min(abs(qty), abs(int(qc_qty)))
except Exception:
pass
return qty
except Exception:
return qty
def _too_many_open_orders(self) -> bool:
try:
tx = getattr(self, 'Transactions', None)
if tx is None:
return False
open_orders = tx.GetOpenOrders(self.eth_symbol)
return len(open_orders) >= int(self.max_open_orders_per_symbol)
except Exception:
return False
def _in_liquidation_cooldown(self) -> bool:
try:
return self.liquidation_cooldown_until is not None and self.time < self.liquidation_cooldown_until
except Exception:
return False
def _cancel_all_open_orders(self):
try:
tx = getattr(self, 'Transactions', None)
if tx is None:
return
for o in list(tx.GetOpenOrders(self.eth_symbol)):
try:
tx.CancelOrder(o.Id)
except Exception:
pass
except Exception:
pass
def _calc_order_qty_for_target(self, symbol, target_percent):
try:
calc = getattr(self, 'CalculateOrderQuantity', None)
if calc is not None:
return int(calc(symbol, float(target_percent)))
except Exception:
try:
calc2 = getattr(self, 'calculate_order_quantity', None)
if calc2 is not None:
return int(calc2(symbol, float(target_percent)))
return None
except Exception:
return None
def _recent_entry_count(self, lookback_s: int) -> int:
try:
cutoff = self.time - timedelta(seconds=int(lookback_s))
while len(self.entry_fills_last_minute) > 0 and self.entry_fills_last_minute[0] < cutoff:
self.entry_fills_last_minute.popleft()
return len(self.entry_fills_last_minute)
except Exception:
return 0
def _num_open_positions(self) -> int:
try:
holdings = [h for h in self.Portfolio.Values if h.Invested]
return len(holdings)
except Exception:
try:
return 1 if self.portfolio[self.eth_symbol].invested else 0
except Exception:
return 0
# ---------- Order/Logging helpers ----------
def on_order_event(self, order_event: OrderEvent):
try:
if order_event is None:
return
# Detect forced liquidations or broker-side margin closes
msg = ""
try:
msg_val = getattr(order_event, 'Message', None)
if not msg_val:
msg_val = getattr(order_event, 'message', None)
msg = str(msg_val or "")
except Exception:
pass
tag = ""
try:
tx = getattr(self, 'Transactions', None)
ord_id = getattr(order_event, 'OrderId', None)
if ord_id is None:
ord_id = getattr(order_event, 'order_id', None)
order_obj = None
if tx is not None and ord_id is not None:
try:
order_obj = tx.GetOrderById(int(ord_id))
except Exception:
order_obj = None
if order_obj is not None:
tag_val = getattr(order_obj, 'Tag', None)
if not tag_val:
tag_val = getattr(order_obj, 'tag', None)
tag = str(tag_val or "")
except Exception:
pass
if ("Liquidat" in msg) or ("Liquidat" in tag):
self.liquidation_cooldown_until = self.time + timedelta(seconds=int(self.post_liquidation_cooldown_s))
try:
self.debug("risk: liquidation detected; entering cooldown and canceling orders")
except Exception:
pass
try:
self._cancel_pending_entry()
except Exception:
pass
try:
self._cancel_protective_stop()
except Exception:
pass
try:
self._cancel_all_open_orders()
except Exception:
pass
# Initialize risk tracking upon first fill when we have position and no active tracker
if (order_event.status == OrderStatus.FILLED or order_event.status == OrderStatus.PARTIALLY_FILLED):
qty_now = 0
try:
qty_now = int(self.portfolio[self.eth_symbol].quantity)
except Exception:
try:
qty_now = int(self.Portfolio[self.eth_symbol].Quantity)
except Exception:
qty_now = 0
if self.active_trade is None and qty_now != 0:
direction = 1 if qty_now > 0 else -1
entry_price = float(order_event.fill_price) if order_event.fill_price is not None else (self.mid if self.mid is not None else 0.0)
self._init_risk_tracker(direction=direction, entry_price=entry_price, qty=qty_now)
# Record entry fill for throttle
try:
self.entry_fills_last_minute.append(self.time)
except Exception:
pass
self.pending_entry_order_id = None
self.pending_entry_ticket = None
# Log exits
if qty_now == 0 and self.active_trade is not None:
side = "long" if self.active_trade["direction"] > 0 else "short"
exit_px = float(order_event.fill_price) if order_event.fill_price is not None else (self.mid if self.mid is not None else 0.0)
self.csv_trades.append([str(self.time), side, exit_px, 0, "exit"])
try:
direction = self.active_trade.get("direction", 0)
qty_entry = abs(int(self.active_trade.get("qty", 0)))
entry_px = float(self.active_trade.get("entry_price", exit_px))
pnl = (exit_px - entry_px) * (1 if direction > 0 else -1) * qty_entry
self.realized_pnl_window.append((self.time, pnl))
cat = self.active_trade.get("category", None)
if cat is not None and cat in self.category_realized:
self.category_realized[cat].append((self.time, pnl))
except Exception:
pass
try:
self._cancel_protective_stop()
except Exception:
pass
try:
self._cancel_all_open_orders()
except Exception:
pass
self.active_trade = None
except Exception:
pass
# Margin call hooks
def OnMarginCallWarning(self):
try:
self.liquidation_cooldown_until = self.time + timedelta(seconds=int(self.post_liquidation_cooldown_s))
self._cancel_all_open_orders()
self._cancel_pending_entry()
self.debug("risk: margin-call warning; cooldown started")
except Exception:
pass
def on_margin_call_warning(self):
self.OnMarginCallWarning()
def OnMarginCall(self, requests):
try:
self.liquidation_cooldown_until = self.time + timedelta(seconds=int(self.post_liquidation_cooldown_s))
try:
self.liquidate(self.eth_symbol)
except Exception:
pass
self._cancel_all_open_orders()
self._cancel_pending_entry()
self._cancel_protective_stop()
self.debug("risk: margin call executed; cooldown started")
except Exception:
pass
def on_margin_call(self, requests):
self.OnMarginCall(requests)
def _buffer_signal(self, regime: str, signal: str, conf: int):
try:
self.csv_signals.append([str(self.time), regime, signal, conf, self.mid, self.spread, self.ofi, self.cvd, self.microprice])
except Exception:
pass
def _flush_csv(self):
# Persist buffers to QC ObjectStore for download after backtest
try:
if len(self.csv_signals) > 0:
header = "time,regime,signal,conf,mid,spread,ofi,cvd,micro\n"
body = "\n".join([",".join([str(x) for x in row]) for row in self.csv_signals])
self.object_store.save("signals.csv", header + body)
self.csv_signals.clear()
if len(self.csv_trades) > 0:
header = "time,side,price,qty,reason\n"
body = "\n".join([",".join([str(x) for x in row]) for row in self.csv_trades])
self.object_store.save("trades.csv", header + body)
self.csv_trades.clear()
if len(self.csv_markouts) > 0:
header = "time,category,h,pnl\n"
body = "\n".join([",".join([str(x) for x in row]) for row in self.csv_markouts])
self.object_store.save("markouts.csv", header + body)
self.csv_markouts.clear()
except Exception:
# Fall back to Debug logs if ObjectStore unavailable
try:
for row in self.csv_signals:
self.debug("CSV_SIGNAL," + ",".join([str(x) for x in row]))
for row in self.csv_trades:
self.debug("CSV_TRADE," + ",".join([str(x) for x in row]))
for row in self.csv_markouts:
self.debug("CSV_MARKOUT," + ",".join([str(x) for x in row]))
except Exception:
pass
self.csv_signals.clear()
self.csv_trades.clear()
self.csv_markouts.clear()