| Overall Statistics |
|
Total Orders 7555 Average Win 0.01% Average Loss -0.01% Compounding Annual Return 83.300% Drawdown 0.400% Expectancy 0.482 Start Equity 100000.00 End Equity 116501.46 Net Profit 16.501% Sharpe Ratio 5.5 Sortino Ratio 21.771 Probabilistic Sharpe Ratio 100.000% Loss Rate 24% Win Rate 76% Profit-Loss Ratio 0.94 Alpha 0.502 Beta -0.218 Annual Standard Deviation 0.088 Annual Variance 0.008 Information Ratio 2.9 Tracking Error 0.137 Treynor Ratio -2.215 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset ETHUSD 2XR Portfolio Turnover 1063.36% Drawdown Recovery 1 |
# region imports
from AlgorithmImports import *
from collections import deque
from datetime import timedelta
import math
# endregion
class Liquiditymonstercore(QCAlgorithm):
def _get_param(self, key: str, default_value: str) -> str:
try:
getter = getattr(self, "get_parameter", None) or getattr(self, "GetParameter", None)
if getter is not None:
val = getter(key)
if val is not None and val != "":
return val
except Exception:
pass
return default_value
def initialize(self):
# ---- Configurable params ----
self.base_symbol = self._get_param("symbol", "ETHUSD")
start = self._get_param("start", "2024-04-01")
end = self._get_param("end", "2024-07-01")
cash = float(self._get_param("cash", "100000"))
self.maker_fee_rate = float(self._get_param("maker_fee", "0.0002"))
self.taker_fee_rate = float(self._get_param("taker_fee", "0.0005"))
self.use_maker_for_fades = (self._get_param("use_maker_for_fades", "true").lower() == "true")
self.max_position_value = float(self._get_param("max_pos_value", "0.2"))
self.stop_vol_k = float(self._get_param("stop_vol_k", "3.0"))
self.time_stop_seconds = int(self._get_param("time_stop_s", "10"))
self.partial_tp_ratio = float(self._get_param("partial_tp_ratio", "0.5"))
self.partial_tp_target_sigma = float(self._get_param("partial_tp_sigma", "1.0"))
self.trail_sigma = float(self._get_param("trail_sigma", "1.0"))
# Feature smoothing and thresholds
self.alpha_vol = float(self._get_param("alpha_vol", "0.1"))
self.alpha_ofi = float(self._get_param("alpha_ofi", "0.2"))
self.alpha_cvd = float(self._get_param("alpha_cvd", "0.2"))
self.micro_tilt_threshold = float(self._get_param("micro_tilt_threshold", "0.05"))
# Safety gates and cadence
self.max_spread_allowed = float(self._get_param("max_spread", "5.0"))
self.trade_cooldown_s = int(self._get_param("trade_cooldown_s", "5"))
# Breakout execution
self.breakout_sigma_band = float(self._get_param("breakout_sigma_band", "0.5"))
self.breakout_limit_extra_sigma = float(self._get_param("breakout_limit_extra_sigma", "0.2"))
# ---- Backtest setup ----
try:
y, m, d = [int(x) for x in start.split("-")]
self.set_start_date(y, m, d)
except Exception:
self.set_start_date(2024, 1, 1)
# Optional end date
if end:
try:
y2, m2, d2 = [int(x) for x in end.split("-")]
self.set_end_date(y2, m2, d2)
except Exception:
pass
self.set_cash(cash)
# Data subscription
self.eth_security = self.add_crypto(self.base_symbol, Resolution.TICK)
self.eth_symbol = self.eth_security.symbol
# Feature state
self.last_bid = None
self.last_ask = None
self.last_bid_size = None
self.last_ask_size = None
self.spread = None
self.mid = None
self.microprice = None
# Order Flow Imbalance (top-of-book approximation)
self.ofi = 0.0
# Cumulative Volume Delta (trade-aggressor signed volume)
self.cvd = 0.0
# Rolling mid history for short-horizon volatility (EWMA-like)
self.mid_window = deque(maxlen=200)
self.vol_ewma = 0.0
# Smoothed flow
self.ofi_ema = 0.0
self.cvd_ema = 0.0
self.last_ofi = 0.0
# Per-second flow persistence (last N seconds of OFI delta sign)
self.ofi_delta_last_seconds = deque(maxlen=10)
# Top-of-book change tracking to proxy cancel vs replenish
self.size_change_window = deque(maxlen=20) # store tuples (dt, dBid, dAsk)
# Markout tracking (expected short-horizon PnL after signal)
self.markout_horizons = [1, 5, 10] # seconds
self.min_samples_for_gating = 50
self.pending_markouts = {
"fade_long": [],
"fade_short": [],
"break_long": [],
"break_short": []
}
self.markout_results = {
cat: {h: deque(maxlen=400) for h in self.markout_horizons}
for cat in self.pending_markouts.keys()
}
# Execution/risk tracking
self.active_trade = None # {direction, entry_time, entry_price, stop_price, time_stop_due, qty, partial_taken, trail_anchor}
self.pending_entry_order_id = None
self.pending_entry_ticket = None
self.pending_entry_expires = None
self.last_trade_time = None
# Timing helpers
self.last_feature_log = None
self.last_flush = None
# CSV buffers
self.csv_signals = [] # rows: time,regime,signal,conf,mid,spread,ofi,cvd,micro
self.csv_trades = [] # rows: time,side,price,qty,reason
self.csv_markouts = [] # rows: time,category,h,exp
self.debug("Initialized ETH microstructure scaffold")
def on_data(self, data: Slice):
# Process quote ticks to maintain best bid/ask and top-of-book sizes
if self.eth_symbol in data.ticks:
quote_ticks = [t for t in data.ticks[self.eth_symbol] if t.tick_type == TickType.QUOTE]
trade_ticks = [t for t in data.ticks[self.eth_symbol] if t.tick_type == TickType.TRADE]
for qt in quote_ticks:
self._update_top_of_book(qt)
for tt in trade_ticks:
self._update_trade_flow(tt)
# Periodically compute features and evaluate signals (e.g., once per second)
if self.last_feature_log is None or (self.time - self.last_feature_log).total_seconds() >= 1:
self._compute_features()
self._update_markouts()
self._log_features()
self._evaluate_and_trade()
self._manage_active_trade()
self.last_feature_log = self.time
# ---------- Feature maintenance ----------
def _update_top_of_book(self, qt: Tick):
bid = qt.bid_price
ask = qt.ask_price
bid_size = float(qt.bid_size) if qt.bid_size is not None else None
ask_size = float(qt.ask_size) if qt.ask_size is not None else None
if self.last_bid is not None and self.last_ask is not None and bid_size is not None and ask_size is not None and self.last_bid_size is not None and self.last_ask_size is not None:
if bid > self.last_bid:
self.ofi += bid_size
elif bid < self.last_bid:
self.ofi -= self.last_bid_size
else:
self.ofi += (bid_size - self.last_bid_size)
if ask < self.last_ask:
self.ofi += ask_size
elif ask > self.last_ask:
self.ofi -= self.last_ask_size
else:
self.ofi -= (ask_size - self.last_ask_size)
# Track size change window for cancel/replenish proxies
if self.last_bid_size is not None and self.last_ask_size is not None and bid_size is not None and ask_size is not None:
self.size_change_window.append((self.time, bid_size - self.last_bid_size, ask_size - self.last_ask_size))
self.last_bid = bid
self.last_ask = ask
self.last_bid_size = bid_size
self.last_ask_size = ask_size
if bid is not None and ask is not None and bid > 0 and ask > 0:
self.spread = ask - bid
self.mid = 0.5 * (ask + bid)
if bid_size is not None and ask_size is not None and (bid_size + ask_size) > 0:
self.microprice = (ask * bid_size + bid * ask_size) / (bid_size + ask_size)
else:
self.microprice = self.mid
self.mid_window.append(self.mid)
if len(self.mid_window) >= 2:
ret = (self.mid_window[-1] - self.mid_window[-2])
self.vol_ewma = (1 - self.alpha_vol) * self.vol_ewma + self.alpha_vol * (ret * ret)
def _update_trade_flow(self, tt: Tick):
if self.mid is None or tt.value is None or tt.quantity is None:
return
trade_price = tt.value
trade_qty = float(tt.quantity)
if trade_price >= self.mid:
self.cvd += trade_qty
else:
self.cvd -= trade_qty
def _compute_features(self):
# Smooth flows and compute per-second OFI delta sign
self.ofi_ema = (1 - self.alpha_ofi) * self.ofi_ema + self.alpha_ofi * self.ofi
self.cvd_ema = (1 - self.alpha_cvd) * self.cvd_ema + self.alpha_cvd * self.cvd
# OFI delta for persistence window
ofi_delta = self.ofi - self.last_ofi
self.last_ofi = self.ofi
self.ofi_delta_last_seconds.append(ofi_delta)
# Compute replenish/cancel proxies over recent window
# Positive bid delta implies replenish; negative implies cancel. Opposite for ask side.
# Aggregate over last ~3 seconds worth of events
cutoff = self.time - timedelta(seconds=3)
bid_replenish = 0.0
ask_replenish = 0.0
for ts, d_bid, d_ask in list(self.size_change_window):
if ts < cutoff:
continue
if d_bid > 0:
bid_replenish += d_bid
if d_ask > 0:
ask_replenish += d_ask
self.bid_replenish_score = bid_replenish
self.ask_replenish_score = ask_replenish
def _log_features(self):
if self.mid is None or self.spread is None or self.microprice is None:
return
self.debug(f"t={self.time} mid={self.mid:.2f} spr={self.spread:.2f} micro={self.microprice:.2f} ofi={self.ofi:.2f} cvd={self.cvd:.2f} volEWMA={self.vol_ewma:.6f}")
# ---------- Markouts ----------
def _register_signal_markout(self, category: str):
if self.mid is None:
return
anchor_price = self.mid
now = self.time
entry = {
"t0": now,
"p0": anchor_price,
"due_times": {h: now + timedelta(seconds=h) for h in self.markout_horizons}
}
self.pending_markouts[category].append(entry)
def _update_markouts(self):
if self.mid is None:
return
now = self.time
for category, entries in self.pending_markouts.items():
i = 0
while i < len(entries):
entry = entries[i]
matured = []
for h, due in entry["due_times"].items():
if now >= due:
matured.append(h)
for h in matured:
pnl = self.mid - entry["p0"]
self.markout_results[category][h].append(pnl)
# buffer markout observation
try:
self.csv_markouts.append([str(now), category, h, pnl])
except Exception:
pass
del entry["due_times"][h]
if not entry["due_times"]:
entries.pop(i)
else:
i += 1
def _expected_markout(self, category: str, horizon: int) -> float:
arr = self.markout_results[category][horizon]
if len(arr) == 0:
return 0.0
return sum(arr) / float(len(arr))
def _estimate_trade_cost(self, direction: str, mode: str) -> float:
if self.mid is None:
return 0.0
half_spread = (self.spread * 0.5) if self.spread is not None else 0.0
fee_rate = self.maker_fee_rate if (mode == "fade" and self.use_maker_for_fades) else self.taker_fee_rate
fee_cost = fee_rate * self.mid
return half_spread + fee_cost
def _passes_markout_gating(self, regime: str, direction: str) -> bool:
if regime == "range":
category = "fade_long" if direction == "long" else "fade_short"
mode = "fade"
else:
category = "break_long" if direction == "long" else "break_short"
mode = "break"
total_samples = sum(len(self.markout_results[category][h]) for h in self.markout_horizons)
if total_samples < self.min_samples_for_gating:
return True
sign = 1.0 if direction == "long" else -1.0
exp1 = sign * self._expected_markout(category, 1)
exp5 = sign * self._expected_markout(category, 5)
exp = 0.5 * exp1 + 0.5 * exp5
cost = self._estimate_trade_cost(direction, mode)
return exp > cost
# ---------- Signal + execution ----------
def _evaluate_and_trade(self):
regime = self._classify_regime()
if regime == "range":
signal = self._absorption_fade_signal()
category_long = "fade_long"
category_short = "fade_short"
else:
signal = self._sweep_breakout_signal()
category_long = "break_long"
category_short = "break_short"
if signal is None:
try:
self.debug(f"skip: no signal (regime={regime})")
except Exception:
pass
return
if signal == "long":
self._register_signal_markout(category_long)
elif signal == "short":
self._register_signal_markout(category_short)
if not self._passes_markout_gating(regime, signal):
try:
mode = "fade" if regime == "range" else "break"
cost = self._estimate_trade_cost(signal, mode)
if regime == "range":
cat = "fade_long" if signal == "long" else "fade_short"
else:
cat = "break_long" if signal == "long" else "break_short"
exp1 = self._expected_markout(cat, 1)
exp5 = self._expected_markout(cat, 5)
exp = 0.5 * ((1 if signal == "long" else -1) * exp1) + 0.5 * ((1 if signal == "long" else -1) * exp5)
self.debug(f"skip: markout gating exp={exp:.4f} < cost={cost:.4f} (regime={regime}, dir={signal})")
except Exception:
pass
return
# Safety gates
if self.spread is not None and self.spread > self.max_spread_allowed:
try:
self.debug(f"skip: spread {self.spread:.2f} > max {self.max_spread_allowed:.2f}")
except Exception:
pass
return
if self.last_trade_time is not None and (self.time - self.last_trade_time).total_seconds() < self.trade_cooldown_s:
try:
waited = (self.time - self.last_trade_time).total_seconds()
self.debug(f"skip: cooldown {waited:.0f}s/{self.trade_cooldown_s}s")
except Exception:
pass
return
if regime == "range":
if signal == "long":
self._enter_long_passive()
elif signal == "short":
self._enter_short_passive()
else:
if signal == "long":
self._enter_long_breakout()
elif signal == "short":
self._enter_short_breakout()
# manage expiry of any still-open entry
self._expire_pending_entry()
def _classify_regime(self) -> str:
if self.spread is None or self.vol_ewma is None:
return "range"
# Persistence of OFI sign across last seconds
pos = sum(1 for x in self.ofi_delta_last_seconds if x > 0)
neg = sum(1 for x in self.ofi_delta_last_seconds if x < 0)
ofi_persistence = abs(pos - neg)
# Compare volatility to spread; higher multiple indicates breakout regime
if self.vol_ewma > 2.0 * (self.spread * self.spread) and ofi_persistence >= 6:
return "breakout"
return "range"
def _absorption_fade_signal(self):
if self.mid is None or self.microprice is None or self.spread is None:
return None
# Build confirmations (need 2 of 3)
micro_tilt_up = (self.microprice - self.mid) >= self.micro_tilt_threshold
micro_tilt_dn = (self.mid - self.microprice) >= self.micro_tilt_threshold
sell_flow_no_progress = (self.cvd_ema < 0 and abs(self.mid_window[-1] - self.mid_window[0]) < max(1.0, self.spread)) if len(self.mid_window) >= 2 else (self.cvd_ema < 0)
buy_flow_no_progress = (self.cvd_ema > 0 and abs(self.mid_window[-1] - self.mid_window[0]) < max(1.0, self.spread)) if len(self.mid_window) >= 2 else (self.cvd_ema > 0)
bid_replenish = (self.bid_replenish_score > 0)
ask_replenish = (self.ask_replenish_score > 0)
conf_long = sum([sell_flow_no_progress, bid_replenish, micro_tilt_up])
conf_short = sum([buy_flow_no_progress, ask_replenish, micro_tilt_dn])
if self.spread <= self.max_spread_allowed and conf_long >= 2:
self._buffer_signal("range", "long", conf_long)
return "long"
if self.spread <= self.max_spread_allowed and conf_short >= 2:
self._buffer_signal("range", "short", conf_short)
return "short"
return None
def _sweep_breakout_signal(self):
if self.mid is None or self.microprice is None:
return None
# Approximations without full L2
big_ofi = abs(self.ofi_delta_last_seconds[-1]) if len(self.ofi_delta_last_seconds) > 0 else 0.0
ofi_unidirectional = sum(1 for x in self.ofi_delta_last_seconds if x > 0) >= 7 or sum(1 for x in self.ofi_delta_last_seconds if x < 0) >= 7
no_refill_long = (self.ask_replenish_score <= 0)
no_refill_short = (self.bid_replenish_score <= 0)
micro_lean_up = (self.microprice > self.mid)
micro_lean_dn = (self.microprice < self.mid)
conf_long = sum([ofi_unidirectional and big_ofi > 0, no_refill_long, micro_lean_up])
conf_short = sum([ofi_unidirectional and big_ofi > 0, no_refill_short, micro_lean_dn])
if conf_long >= 2:
self._buffer_signal("breakout", "long", conf_long)
return "long"
if conf_short >= 2:
self._buffer_signal("breakout", "short", conf_short)
return "short"
return None
# ---------- Execution + risk helpers ----------
def _enter_long_passive(self):
if self.portfolio[self.eth_symbol].invested and self.portfolio[self.eth_symbol].quantity > 0:
return
qty = self._position_size(direction=1)
if qty <= 0:
return
limit_price = self.last_bid if self.last_bid is not None else None
if limit_price is None:
return
if self._has_open_entry():
self._reprice_entry_if_needed("long", limit_price)
return
ticket = self.limit_order(self.eth_symbol, qty, limit_price)
if ticket is not None and ticket.order_id > 0:
self.pending_entry_order_id = ticket.order_id
self.pending_entry_ticket = ticket
self.pending_entry_expires = self.time + timedelta(seconds=3)
try:
self.debug(f"entry: passive long qty={qty} px={limit_price:.2f}")
except Exception:
pass
def _enter_short_passive(self):
if self.portfolio[self.eth_symbol].invested and self.portfolio[self.eth_symbol].quantity < 0:
return
qty = self._position_size(direction=-1)
if qty >= 0:
return
limit_price = self.last_ask if self.last_ask is not None else None
if limit_price is None:
return
if self._has_open_entry():
self._reprice_entry_if_needed("short", limit_price)
return
ticket = self.limit_order(self.eth_symbol, qty, limit_price)
if ticket is not None and ticket.order_id > 0:
self.pending_entry_order_id = ticket.order_id
self.pending_entry_ticket = ticket
self.pending_entry_expires = self.time + timedelta(seconds=3)
try:
self.debug(f"entry: passive short qty={qty} px={limit_price:.2f}")
except Exception:
pass
def _enter_long_breakout(self):
if self.portfolio[self.eth_symbol].invested and self.portfolio[self.eth_symbol].quantity > 0:
return
qty = self._position_size(direction=1)
if qty <= 0 or self.mid is None:
return
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
stop_price = self.mid + self.breakout_sigma_band * sigma_price
limit_price = stop_price + self.breakout_limit_extra_sigma * sigma_price
if self._has_open_entry():
# If we already have a pending breakout order, leave it or cancel/replace if far
self._reprice_entry_if_needed("long_break", limit_price, stop_price)
return
ticket = self.stop_limit_order(self.eth_symbol, qty, stop_price, limit_price)
if ticket is not None and ticket.order_id > 0:
self.pending_entry_order_id = ticket.order_id
self.pending_entry_ticket = ticket
self.pending_entry_expires = self.time + timedelta(seconds=5)
try:
self.debug(f"entry: breakout long qty={qty} stop={stop_price:.2f} limit={limit_price:.2f}")
except Exception:
pass
def _enter_short_breakout(self):
if self.portfolio[self.eth_symbol].invested and self.portfolio[self.eth_symbol].quantity < 0:
return
qty = self._position_size(direction=-1)
if qty >= 0 or self.mid is None:
return
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
stop_price = self.mid - self.breakout_sigma_band * sigma_price
limit_price = stop_price - self.breakout_limit_extra_sigma * sigma_price
if self._has_open_entry():
self._reprice_entry_if_needed("short_break", limit_price, stop_price)
return
ticket = self.stop_limit_order(self.eth_symbol, qty, stop_price, limit_price)
if ticket is not None and ticket.order_id > 0:
self.pending_entry_order_id = ticket.order_id
self.pending_entry_ticket = ticket
self.pending_entry_expires = self.time + timedelta(seconds=5)
try:
self.debug(f"entry: breakout short qty={qty} stop={stop_price:.2f} limit={limit_price:.2f}")
except Exception:
pass
def _init_risk_tracker(self, direction: int, entry_price: float, qty: int):
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
stop_distance = self.stop_vol_k * sigma_price
stop_price = entry_price - stop_distance if direction > 0 else entry_price + stop_distance
self.active_trade = {
"direction": direction,
"entry_time": self.time,
"entry_price": entry_price,
"stop_price": stop_price,
"time_stop_due": self.time + timedelta(seconds=self.time_stop_seconds),
"qty": qty,
"partial_taken": False,
"trail_anchor": entry_price
}
self.last_trade_time = self.time
try:
side = "long" if direction > 0 else "short"
self.csv_trades.append([str(self.time), side, entry_price, qty, "entry"])
except Exception:
pass
def _manage_active_trade(self):
if not self.active_trade or self.mid is None:
return
direction = self.active_trade["direction"]
stop_price = self.active_trade["stop_price"]
qty = self.active_trade["qty"]
entry_price = self.active_trade["entry_price"]
# Time stop
if self.time >= self.active_trade["time_stop_due"]:
self.liquidate(self.eth_symbol)
self.active_trade = None
return
# Take-profit partial
if not self.active_trade["partial_taken"]:
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
tp_distance = self.partial_tp_target_sigma * sigma_price
target = entry_price + tp_distance if direction > 0 else entry_price - tp_distance
if (direction > 0 and self.mid >= target) or (direction < 0 and self.mid <= target):
tp_qty = int(max(1, abs(qty) * self.partial_tp_ratio)) * (1 if direction > 0 else -1)
self.market_order(self.eth_symbol, -tp_qty)
self.active_trade["partial_taken"] = True
self.active_trade["trail_anchor"] = self.mid
# Microprice-based trailing stop
sigma_price = math.sqrt(self.vol_ewma) if self.vol_ewma > 0 else (self.spread if self.spread else 1.0)
trail_step = self.trail_sigma * sigma_price
if direction > 0:
if self.mid > self.active_trade["trail_anchor"]:
self.active_trade["trail_anchor"] = self.mid
new_stop = self.active_trade["trail_anchor"] - trail_step
if new_stop > stop_price:
self.active_trade["stop_price"] = new_stop
else:
if self.mid < self.active_trade["trail_anchor"]:
self.active_trade["trail_anchor"] = self.mid
new_stop = self.active_trade["trail_anchor"] + trail_step
if new_stop < stop_price:
self.active_trade["stop_price"] = new_stop
# Price-based hard stop
stop_price = self.active_trade["stop_price"]
if direction > 0 and self.mid <= stop_price:
self.liquidate(self.eth_symbol)
self.active_trade = None
return
if direction < 0 and self.mid >= stop_price:
self.liquidate(self.eth_symbol)
self.active_trade = None
return
# Periodically flush CSV buffers
if self.last_flush is None or (self.time - self.last_flush).total_seconds() >= 60:
self._flush_csv()
self.last_flush = self.time
# ---------- Entry order management ----------
def _has_open_entry(self) -> bool:
try:
# Prefer our stored ticket
if self.pending_entry_ticket is not None:
ord_status = self.pending_entry_ticket.status
if ord_status in [OrderStatus.New, OrderStatus.Submitted, OrderStatus.PartiallyFilled]:
return True
# Fallback: check open orders for the symbol
open_orders = self.Transactions.GetOpenOrders(self.eth_symbol)
return len(open_orders) > 0
except Exception:
return False
def _cancel_pending_entry(self):
try:
if self.pending_entry_ticket is not None:
self.pending_entry_ticket.cancel()
except Exception:
try:
if self.pending_entry_order_id is not None:
self.Transactions.CancelOrder(self.pending_entry_order_id)
except Exception:
pass
self.pending_entry_ticket = None
self.pending_entry_order_id = None
self.pending_entry_expires = None
def _expire_pending_entry(self):
try:
if self.pending_entry_expires is not None and self.time >= self.pending_entry_expires and self._has_open_entry():
self.debug("expire: cancel stale entry order")
self._cancel_pending_entry()
except Exception:
pass
def _reprice_entry_if_needed(self, side: str, desired_limit: float, desired_stop: float = None):
try:
if not self._has_open_entry():
return
# Determine current price on ticket if we can
current_limit = None
current_stop = None
if self.pending_entry_ticket is not None:
try:
res_limit = self.pending_entry_ticket.Get(OrderField.LIMIT_PRICE)
if getattr(res_limit, 'IsSuccess', False) and getattr(res_limit, 'Value', None) is not None:
current_limit = float(res_limit.Value)
except Exception:
pass
try:
res_stop = self.pending_entry_ticket.Get(OrderField.STOP_PRICE)
if getattr(res_stop, 'IsSuccess', False) and getattr(res_stop, 'Value', None) is not None:
current_stop = float(res_stop.Value)
except Exception:
pass
# If price already aligned closely, do nothing
if current_limit is not None and abs(current_limit - desired_limit) <= (self.spread or 1.0) * 0.1:
return
# Otherwise cancel and allow next cycle to place a new aligned order
self._cancel_pending_entry()
try:
self.debug("reprice: canceled pending entry; will re-place next cycle")
except Exception:
pass
return
except Exception:
pass
def _position_size(self, direction: int) -> int:
# Use QC API casing and coerce to float to avoid comparison issues
try:
pv = float(self.Portfolio.TotalPortfolioValue)
except Exception:
pv = float(getattr(getattr(self, 'portfolio', {}), 'total_portfolio_value', 0.0))
max_value = float(self.max_position_value) * pv
price = float(self.mid) if self.mid is not None else None
if price is None or price <= 0.0:
return 0
raw_qty = int(max_value / price)
qty = max(0, raw_qty)
return qty * (1 if direction > 0 else -1)
# ---------- Order/Logging helpers ----------
def on_order_event(self, order_event: OrderEvent):
try:
if order_event is None:
return
# Initialize risk tracking upon first fill when we have position and no active tracker
if (order_event.status == OrderStatus.Filled or order_event.status == OrderStatus.PartiallyFilled):
qty_now = self.portfolio[self.eth_symbol].quantity
if self.active_trade is None and qty_now != 0:
direction = 1 if qty_now > 0 else -1
entry_price = float(order_event.fill_price) if order_event.fill_price is not None else (self.mid if self.mid is not None else 0.0)
self._init_risk_tracker(direction=direction, entry_price=entry_price, qty=qty_now)
self.pending_entry_order_id = None
# Log exits
if qty_now == 0 and self.active_trade is not None:
side = "long" if self.active_trade["direction"] > 0 else "short"
self.csv_trades.append([str(self.time), side, float(order_event.fill_price) if order_event.fill_price is not None else 0.0, 0, "exit"])
self.active_trade = None
except Exception:
pass
def _buffer_signal(self, regime: str, signal: str, conf: int):
try:
self.csv_signals.append([str(self.time), regime, signal, conf, self.mid, self.spread, self.ofi, self.cvd, self.microprice])
except Exception:
pass
def _flush_csv(self):
# Persist buffers to QC ObjectStore for download after backtest
try:
if len(self.csv_signals) > 0:
header = "time,regime,signal,conf,mid,spread,ofi,cvd,micro\n"
body = "\n".join([",".join([str(x) for x in row]) for row in self.csv_signals])
self.object_store.save("signals.csv", header + body)
self.csv_signals.clear()
if len(self.csv_trades) > 0:
header = "time,side,price,qty,reason\n"
body = "\n".join([",".join([str(x) for x in row]) for row in self.csv_trades])
self.object_store.save("trades.csv", header + body)
self.csv_trades.clear()
if len(self.csv_markouts) > 0:
header = "time,category,h,pnl\n"
body = "\n".join([",".join([str(x) for x in row]) for row in self.csv_markouts])
self.object_store.save("markouts.csv", header + body)
self.csv_markouts.clear()
except Exception:
# Fall back to Debug logs if ObjectStore unavailable
try:
for row in self.csv_signals:
self.debug("CSV_SIGNAL," + ",".join([str(x) for x in row]))
for row in self.csv_trades:
self.debug("CSV_TRADE," + ",".join([str(x) for x in row]))
for row in self.csv_markouts:
self.debug("CSV_MARKOUT," + ",".join([str(x) for x in row]))
except Exception:
pass
self.csv_signals.clear()
self.csv_trades.clear()
self.csv_markouts.clear()