| Overall Statistics |
|
Total Orders 33 Average Win 6.03% Average Loss -2.41% Compounding Annual Return 12.758% Drawdown 11.900% Expectancy 1.100 Start Equity 100000 End Equity 143364.47 Net Profit 43.364% Sharpe Ratio 0.773 Sortino Ratio 0.822 Probabilistic Sharpe Ratio 35.354% Loss Rate 40% Win Rate 60% Profit-Loss Ratio 2.50 Alpha 0 Beta 0 Annual Standard Deviation 0.108 Annual Variance 0.012 Information Ratio 0.857 Tracking Error 0.108 Treynor Ratio 0 Total Fees $56.43 Estimated Strategy Capacity $9400000000.00 Lowest Capacity Asset MES Y4D62XFM9IPT Portfolio Turnover 1.93% Drawdown Recovery 328 |
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
from numba import jit
# ==============================================================================
# MACHINE-CODE MATH ENGINE
# ==============================================================================
@jit(nopython=True)
def jit_hawkes(timestamps, alpha=0.1, beta=0.5, baseline_mu=0.01):
n = len(timestamps)
if n < 2: return baseline_mu
intensity = np.zeros(n)
intensity[0] = baseline_mu + alpha
for i in range(1, n):
delta_t = timestamps[i] - timestamps[i-1]
intensity[i] = baseline_mu + (intensity[i-1] - baseline_mu) * np.exp(-beta * delta_t) + alpha
return intensity[-1]
@jit(nopython=True)
def jit_linregress(x, y):
n = len(x)
sum_x = np.sum(x)
sum_y = np.sum(y)
sum_xx = np.sum(x * x)
sum_xy = np.sum(x * y)
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_xx - sum_x * sum_x)
return slope
@jit(nopython=True)
def jit_hurst(prices, max_lag=20):
n = len(prices)
if n < 100 or prices[0] == 0: return 0.5
returns = np.empty(n - 1)
for i in range(n - 1):
returns[i] = np.log(prices[i+1] / prices[i])
max_entries = max_lag - 2
rs_values = np.empty(max_entries)
valid_lags = np.empty(max_entries)
count = 0
for lag in range(2, max_lag):
num_chunks = len(returns) // lag
if num_chunks == 0: continue
rs_sum = 0.0
for c in range(num_chunks):
chunk = returns[c*lag : (c+1)*lag]
mean = np.mean(chunk)
cum_dev = 0.0
max_dev = 0.0
min_dev = 0.0
variance = 0.0
for val in chunk:
dev = val - mean
cum_dev += dev
if cum_dev > max_dev: max_dev = cum_dev
if cum_dev < min_dev: min_dev = cum_dev
variance += dev * dev
r = max_dev - min_dev
s = np.sqrt(variance / lag)
if s == 0: s = 1e-8
rs_sum += (r / s)
rs_values[count] = rs_sum / num_chunks
valid_lags[count] = lag
count += 1
if count == 0: return 0.5
x = np.log(valid_lags[:count])
y = np.log(rs_values[:count])
return jit_linregress(x, y)
@jit(nopython=True)
def jit_vpin(buy_vol_array, sell_vol_array, bucket_size, window=50):
n = len(buy_vol_array)
if n < window: return 0.0
raw_vpin = np.abs(buy_vol_array - sell_vol_array) / bucket_size
current_vpin = 0.0
for i in range(n - window, n):
current_vpin += raw_vpin[i]
return current_vpin / window
# ==============================================================================
# MASTER MATRIX CONTROLLER
# ==============================================================================
class MedallionCapitvioMatrixAlgo(QCAlgorithm):
def initialize(self):
# SET YOUR TARGET DATES HERE FOR ISOLATED REGIME TESTING
self.set_start_date(2020, 1, 1)
self.set_end_date(2022, 12, 31)
self.set_cash(100000)
self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)
# FINAL ULTIMATE CUT: Exclusively Micro S&P 500
basket = [
Futures.Indices.MICRO_SP_500_E_MINI
]
self.symbol_data = {}
self.symbol_to_ticker = {}
self.set_warm_up(timedelta(days=60))
for ticker in basket:
future = self.add_future(ticker, Resolution.SECOND)
future.set_filter(0, 90)
self.symbol_data[future.symbol] = SymbolData(self, future)
self.symbol_to_ticker[future.symbol] = ticker
self.CORRELATION_GROUPS = []
self.MAX_DAILY_DRAWDOWN = 0.04
self.high_water_mark = self.portfolio.total_portfolio_value
self.start_of_day_equity = self.portfolio.total_portfolio_value
self.daily_kill_switch_tripped = False
self.algo_id = "CAPITVIO_MATRIX_037_APEX_SP500"
self.schedule.on(self.date_rules.every_day(),
self.time_rules.at(0, 5),
self.reset_daily_metrics)
self.schedule.on(self.date_rules.month_start(),
self.time_rules.at(0, 10),
self.run_walk_forward_optimization)
def reset_daily_metrics(self):
self.start_of_day_equity = self.portfolio.total_portfolio_value
self.daily_kill_switch_tripped = False
if self.portfolio.total_portfolio_value > self.high_water_mark:
self.high_water_mark = self.portfolio.total_portfolio_value
def run_walk_forward_optimization(self):
if self.is_warming_up: return
self.debug(f"[{self.time}] Executing Monthly Walk-Forward Parameter Calibration...")
for symbol, sd in self.symbol_data.items():
sd.recalibrate_wfo_parameters()
def get_correlation_scalar(self, symbol, side):
return 1.0
def on_data(self, data):
if not self.daily_kill_switch_tripped:
current_drawdown = (self.start_of_day_equity - self.portfolio.total_portfolio_value) / self.start_of_day_equity
if current_drawdown >= self.MAX_DAILY_DRAWDOWN:
self.debug(f"MATRIX KILL SWITCH TRIPPED. Global Drawdown hit {current_drawdown:.2%}")
self.daily_kill_switch_tripped = True
self.liquidate()
self.transactions.cancel_open_orders()
return
if self.daily_kill_switch_tripped: return
for symbol, sd in self.symbol_data.items():
if data.bars.contains_key(symbol):
sd.process_trade_bar(data.bars[symbol])
if sd.entry_ticket is not None:
sd.check_adverse_selection()
mapped = sd.future.mapped
if mapped is not None:
if sd.last_known_mapped is not None and sd.last_known_mapped != mapped:
self.debug(f"CONTRACT ROLL [{str(symbol)}]: {sd.last_known_mapped} → {mapped}")
sd.handle_contract_roll(mapped)
sd.last_known_mapped = mapped
if mapped and self.portfolio[mapped].invested:
sd.update_trailing_stop()
sd.check_macro_decay(mapped)
def on_order_event(self, order_event):
symbol = order_event.symbol
if symbol in self.symbol_data:
self.symbol_data[symbol].handle_order_event(order_event)
# ==============================================================================
# INDEPENDENT ASSET BRAIN (SP500 MASTER MATRIX)
# ==============================================================================
class SymbolData:
def __init__(self, algo, future):
self.algo = algo
self.symbol = future.symbol
self.future = future
self.hourly = TradeBarConsolidator(timedelta(minutes=60))
self.hourly.data_consolidated += self.on_hourly_bar
self.algo.subscription_manager.add_consolidator(self.symbol, self.hourly)
self.h_std = StandardDeviation(20); self.h_atr = AverageTrueRange(20)
self.algo.register_indicator(self.symbol, self.h_std, self.hourly)
self.algo.register_indicator(self.symbol, self.h_atr, self.hourly)
self.hourly_closes = RollingWindow[float](250)
self.current_hurst = 0.5
# ELITE MACRO BRAIN
self.HURST_THRESHOLD = 0.50
self.emac_spans = [4, 8, 16]; self.slow_spans = [16, 32, 64]
self.ema_dict = {}
for span in (self.emac_spans + self.slow_spans):
self.ema_dict[span] = ExponentialMovingAverage(span)
self.algo.register_indicator(self.symbol, self.ema_dict[span], self.hourly)
self.FORECAST_SCALAR_BY_SPAN = {4: 8.53, 8: 5.95, 16: 4.10}
self.ABS_FORECAST_CAP = 20
self.macro_bias = 0
self.volatility_regime = "WARMUP"
self.continuous_forecast = 0.0
self.m15 = TradeBarConsolidator(timedelta(minutes=15))
self.m15.data_consolidated += self.on_m15_bar
self.algo.subscription_manager.add_consolidator(self.symbol, self.m15)
self.m15_atr = AverageTrueRange(14)
self.algo.register_indicator(self.symbol, self.m15_atr, self.m15)
self.last_m15_low = 0.0; self.last_m15_high = 0.0
self.m5 = TradeBarConsolidator(timedelta(minutes=5))
self.m5.data_consolidated += self.on_m5_bar
self.algo.subscription_manager.add_consolidator(self.symbol, self.m5)
self.m5_atr = AverageTrueRange(14)
self.algo.register_indicator(self.symbol, self.m5_atr, self.m5)
self.m5_bb = BollingerBands(20, 2.0)
self.algo.register_indicator(self.symbol, self.m5_bb, self.m5)
self.vol_window = RollingWindow[float](50)
self.current_m5_delta = 0; self.last_trade_price = 0.0
self.delta_window = RollingWindow[float](5)
self.vpin_bucket_size = 5000
self.vpin_buy_window = RollingWindow[float](50); self.vpin_sell_window = RollingWindow[float](50)
self.current_bucket_buy_vol = 0; self.current_bucket_sell_vol = 0
self.is_market_toxic = False
self.tick_times_ms = RollingWindow[float](100)
self.wfo_historical_vpin = RollingWindow[float](720)
self.wfo_historical_hawkes = RollingWindow[float](720)
self.wfo_historical_delta = RollingWindow[float](720)
self.wfo_historical_atr = RollingWindow[float](720)
self.baseline_atr = 0.0
self.vpin_toxicity_threshold = 0.75
self.HAWKES_EXCITATION_THRESHOLD = 5.0
self.DYNAMIC_DELTA_THRESHOLD = 150.0
self.absorption_zone = None; self.displaced = False
self.entry_ticket = None; self.stop_ticket = None
self.baseline_bid_liquidity = 0
self.pending_side = None; self.pending_qty = 0; self.pending_stop_dist = 0.0
self.trade_direction = None; self.entry_price = 0.0
self.last_known_mapped = None
self.pending_regime = None
self.entry_regime = None
self.pending_target_price = 0.0
self.active_target_price = 0.0
self.initial_stop_dist = 0.0
self.FORECAST_DECAY_THRESHOLD = 4.0
self.BASE_RISK_PCT = 0.004
self.ATR_MULT = 1.75
self.DISPLACEMENT_MULT = 0.5
self.RETEST_VOL_MAX = 0.85
def calculate_vwmp(self, best_bid_price, best_bid_vol, best_ask_price, best_ask_vol):
total_vol = best_bid_vol + best_ask_vol
if total_vol == 0:
return (best_bid_price + best_ask_price) / 2 if best_bid_price > 0 else 0
return best_bid_price * (best_ask_vol / total_vol) + best_ask_price * (best_bid_vol / total_vol)
def recalibrate_wfo_parameters(self):
if not self.wfo_historical_vpin.is_ready or not self.wfo_historical_hawkes.is_ready or not self.wfo_historical_delta.is_ready:
return
vpin_array = np.array(list(self.wfo_historical_vpin), dtype=np.float64)
hawkes_array = np.array(list(self.wfo_historical_hawkes), dtype=np.float64)
delta_array = np.array(list(self.wfo_historical_delta), dtype=np.float64)
# LENIENT MICRO BRAIN (VPIN/Delta 70th Percentile, Hawkes Dormant at 85)
new_vpin_thresh = np.percentile(vpin_array, 70)
new_hawkes_thresh = np.percentile(hawkes_array, 85)
new_delta_thresh = np.percentile(delta_array, 70)
self.vpin_toxicity_threshold = max(0.15, new_vpin_thresh)
self.HAWKES_EXCITATION_THRESHOLD = max(0.50, new_hawkes_thresh)
self.DYNAMIC_DELTA_THRESHOLD = max(10.0, new_delta_thresh)
if self.wfo_historical_atr.is_ready:
atr_array = np.array(list(self.wfo_historical_atr), dtype=np.float64)
self.baseline_atr = np.median(atr_array)
self.algo.debug(f"WFO [{str(self.symbol)}]: VPIN: {self.vpin_toxicity_threshold:.2f} | Delta: {self.DYNAMIC_DELTA_THRESHOLD:.0f} | Base Vol: {self.baseline_atr:.2f}")
def process_trade_bar(self, bar):
self.tick_times_ms.add(bar.end_time.timestamp() * 1000)
if bar.close > self.last_trade_price:
self.current_m5_delta += bar.volume
self.current_bucket_buy_vol += bar.volume
elif bar.close < self.last_trade_price:
self.current_m5_delta -= bar.volume
self.current_bucket_sell_vol += bar.volume
else:
self.current_bucket_buy_vol += bar.volume / 2
self.current_bucket_sell_vol += bar.volume / 2
self.last_trade_price = bar.close
if (self.current_bucket_buy_vol + self.current_bucket_sell_vol) >= self.vpin_bucket_size:
self.vpin_buy_window.add(self.current_bucket_buy_vol)
self.vpin_sell_window.add(self.current_bucket_sell_vol)
self.current_bucket_buy_vol = 0
self.current_bucket_sell_vol = 0
if self.vpin_buy_window.is_ready:
buy_vol_array = np.array(list(self.vpin_buy_window)[::-1], dtype=np.float64)
sell_vol_array = np.array(list(self.vpin_sell_window)[::-1], dtype=np.float64)
self.is_market_toxic = jit_vpin(buy_vol_array, sell_vol_array, float(self.vpin_bucket_size)) > self.vpin_toxicity_threshold
def on_hourly_bar(self, sender, bar):
self.hourly_closes.add(bar.close)
if self.h_atr.is_ready:
self.wfo_historical_atr.add(self.h_atr.current.value)
if self.vpin_buy_window.is_ready:
buy_vol_array = np.array(list(self.vpin_buy_window)[::-1], dtype=np.float64)
sell_vol_array = np.array(list(self.vpin_sell_window)[::-1], dtype=np.float64)
current_vpin = jit_vpin(buy_vol_array, sell_vol_array, float(self.vpin_bucket_size))
self.wfo_historical_vpin.add(current_vpin)
if self.tick_times_ms.is_ready:
timestamps = np.array(list(self.tick_times_ms)[::-1], dtype=np.float64)
self.wfo_historical_hawkes.add(jit_hawkes(timestamps))
if not self.hourly_closes.is_ready or not self.h_std.is_ready or not self.h_atr.is_ready: return
for ema in self.ema_dict.values():
if not ema.is_ready: return
prices_array = np.array(list(self.hourly_closes)[::-1], dtype=np.float64)
self.current_hurst = jit_hurst(prices_array)
capped_forecast_by_span = {}
hourly_risk_price_terms = self.h_atr.current.value
if hourly_risk_price_terms == 0: return
for i, fast_span in enumerate(self.emac_spans):
slow_span = self.slow_spans[i]
fast_ema = self.ema_dict[fast_span].current.value
slow_ema = self.ema_dict[slow_span].current.value
risk_adjusted_ewmac = (fast_ema - slow_ema) / hourly_risk_price_terms
scaled_forecast = risk_adjusted_ewmac * self.FORECAST_SCALAR_BY_SPAN[fast_span]
capped_forecast_by_span[fast_span] = max(min(scaled_forecast, self.ABS_FORECAST_CAP), -self.ABS_FORECAST_CAP)
raw_combined_forecast = sum(capped_forecast_by_span.values()) / len(capped_forecast_by_span)
scaled_combined_forecast = raw_combined_forecast * 1.08
self.continuous_forecast = max(min(scaled_combined_forecast, self.ABS_FORECAST_CAP), -self.ABS_FORECAST_CAP)
if self.current_hurst >= self.HURST_THRESHOLD:
self.volatility_regime = "TREND"
if self.continuous_forecast > 5.0:
self.macro_bias = 1
elif self.continuous_forecast < -5.0:
self.macro_bias = -1
else:
self.macro_bias = 0
else:
self.volatility_regime = "CHOP"
self.macro_bias = 0
def on_m15_bar(self, sender, bar):
self.last_m15_low = bar.low
self.last_m15_high = bar.high
def on_m5_bar(self, sender, bar):
self.vol_window.add(bar.volume)
if self.current_m5_delta != 0:
self.wfo_historical_delta.add(abs(self.current_m5_delta))
self.delta_window.add(self.current_m5_delta)
self.current_m5_delta = 0
if self.algo.is_warming_up: return
if not self.vol_window.is_ready or not self.m5_atr.is_ready or not self.m5_bb.is_ready: return
if self.volatility_regime == "CHOP":
band_tolerance = self.m5_atr.current.value * 0.5
if bar.close <= (self.m5_bb.lower_band.current.value + band_tolerance):
self.macro_bias = 1
elif bar.close >= (self.m5_bb.upper_band.current.value - band_tolerance):
self.macro_bias = -1
else:
self.macro_bias = 0
if self.macro_bias != 0:
self.execute_shadow_entry("BUY" if self.macro_bias == 1 else "SELL")
elif self.volatility_regime == "TREND":
if self.macro_bias == 0:
if self.absorption_zone is not None:
self.absorption_zone = None
self.displaced = False
return
if self.absorption_zone is None:
self.detect_absorption(bar)
elif not self.displaced:
self.detect_displacement(bar)
else:
self.detect_retest(bar)
def check_adverse_selection(self):
if self.entry_ticket is None: return
mapped = self.future.mapped
if mapped is None or not self.algo.securities.contains_key(mapped): return
security = self.algo.securities[mapped]
limit_price = self.entry_ticket.get(OrderField.LIMIT_PRICE)
tick_size = security.symbol_properties.minimum_price_variation
if self.pending_side == "BUY":
if abs(security.bid_price - limit_price) <= (tick_size * 3):
if self.baseline_bid_liquidity == 0:
self.baseline_bid_liquidity = security.bid_size
if security.bid_size < (self.baseline_bid_liquidity * 0.30) and security.bid_size > 0:
self.algo.transactions.cancel_order(self.entry_ticket.order_id)
self.entry_ticket = None
self.baseline_bid_liquidity = 0
return
elif self.pending_side == "SELL":
if abs(security.ask_price - limit_price) <= (tick_size * 3):
if self.baseline_bid_liquidity == 0:
self.baseline_bid_liquidity = security.ask_size
if security.ask_size < (self.baseline_bid_liquidity * 0.30) and security.ask_size > 0:
self.algo.transactions.cancel_order(self.entry_ticket.order_id)
self.entry_ticket = None
self.baseline_bid_liquidity = 0
return
def update_trailing_stop(self):
if self.entry_regime != "TREND": return
mapped = self.future.mapped
if mapped is None or not self.algo.securities.contains_key(mapped) or self.stop_ticket is None: return
if self.last_m15_low == 0.0 or self.last_m15_high == 0.0 or not self.m15_atr.is_ready: return
atr = self.m15_atr.current.value
tick_size = self.algo.securities[mapped].symbol_properties.minimum_price_variation
if self.trade_direction == "BUY":
new_stop_raw = self.last_m15_low - (atr * self.ATR_MULT)
current_stop = self.stop_ticket.get(OrderField.STOP_PRICE)
if new_stop_raw > current_stop:
self.move_stop(new_stop_raw, tick_size)
elif self.trade_direction == "SELL":
new_stop_raw = self.last_m15_high + (atr * self.ATR_MULT)
current_stop = self.stop_ticket.get(OrderField.STOP_PRICE)
if new_stop_raw < current_stop:
self.move_stop(new_stop_raw, tick_size)
def move_stop(self, raw_price, tick_size):
update_fields = UpdateOrderFields()
rounded_price = round(raw_price / tick_size) * tick_size
update_fields.stop_price = rounded_price
self.stop_ticket.update(update_fields)
def check_macro_decay(self, mapped):
if self.trade_direction is None or self.entry_price == 0.0: return
security = self.algo.securities[mapped]
current_qty = self.algo.portfolio[mapped].quantity
if current_qty == 0: return
decay = False
exit_reason = ""
if self.entry_regime == "TREND":
if (self.trade_direction == "BUY" and self.continuous_forecast < self.FORECAST_DECAY_THRESHOLD) or \
(self.trade_direction == "SELL" and self.continuous_forecast > -self.FORECAST_DECAY_THRESHOLD):
decay = True
exit_reason = f"TREND DECAY EXIT: Forecast={self.continuous_forecast:.2f}"
elif self.entry_regime == "CHOP" and self.active_target_price > 0:
if (self.trade_direction == "BUY" and security.price >= self.active_target_price) or \
(self.trade_direction == "SELL" and security.price <= self.active_target_price):
decay = True
exit_reason = f"MEAN REVERSION TARGET HIT (1:2 RR): Target={self.active_target_price:.4f}"
if decay:
self.algo.market_order(mapped, -current_qty)
if self.stop_ticket is not None:
self.algo.transactions.cancel_order(self.stop_ticket.order_id)
self.algo.debug(f"[{str(self.symbol)}] {exit_reason}. Closing {abs(int(current_qty))} contracts.")
def detect_absorption(self, bar):
vols = [x for x in self.vol_window][1:]
mean_vol = np.mean(vols)
std_vol = np.std(vols)
if mean_vol == 0 or std_vol == 0: return
z_score = (bar.volume - mean_vol) / std_vol
candle_body = abs(bar.close - bar.open)
current_atr = self.m5_atr.current.value
vol_cv = std_vol / mean_vol
raw_dynamic_z = 1.0 + (vol_cv * 2.0)
dynamic_z_threshold = max(1.2, min(raw_dynamic_z, 2.5))
if z_score > dynamic_z_threshold and candle_body < (current_atr * 0.5):
self.absorption_zone = bar
self.displaced = False
def detect_displacement(self, bar):
move = abs(bar.close - self.absorption_zone.close)
if move > (self.m5_atr.current.value * self.DISPLACEMENT_MULT):
self.displaced = True
def detect_retest(self, bar):
limit = self.absorption_zone.volume * self.RETEST_VOL_MAX
in_zone = (bar.low <= self.absorption_zone.high and bar.high >= self.absorption_zone.low)
if in_zone and bar.volume < limit:
self.execute_shadow_entry("BUY" if self.macro_bias == 1 else "SELL")
self.absorption_zone = None
self.displaced = False
def execute_shadow_entry(self, side):
if self.algo.portfolio[self.symbol].invested or self.entry_ticket is not None: return
if self.is_market_toxic: return
if not self.delta_window.is_ready: return
recent_delta = sum(list(self.delta_window)[:3])
if side == "BUY" and recent_delta < (self.DYNAMIC_DELTA_THRESHOLD * 0.6): return
if side == "SELL" and recent_delta > -(self.DYNAMIC_DELTA_THRESHOLD * 0.6): return
if not self.tick_times_ms.is_ready: return
timestamps = np.array(list(self.tick_times_ms)[::-1], dtype=np.float64)
current_heat = jit_hawkes(timestamps)
if current_heat > (self.HAWKES_EXCITATION_THRESHOLD * 1.5): return
mapped = self.future.mapped
if mapped is None or not self.algo.securities.contains_key(mapped) or not self.m15_atr.is_ready: return
security = self.algo.securities[mapped]
vwmp_price = self.calculate_vwmp(security.bid_price, security.bid_size, security.ask_price, security.ask_size)
if vwmp_price == 0: return
current_equity = self.algo.portfolio.total_portfolio_value
dynamic_risk_pct = self.BASE_RISK_PCT
if self.baseline_atr > 0 and self.h_atr.is_ready:
current_hourly_atr = self.h_atr.current.value
if current_hourly_atr > 0:
vol_scalar = self.baseline_atr / current_hourly_atr
vol_scalar = max(0.25, min(vol_scalar, 1.5))
dynamic_risk_pct = self.BASE_RISK_PCT * vol_scalar
if self.volatility_regime == "CHOP":
target_price = self.m5_bb.middle_band.current.value
expected_profit = abs(vwmp_price - target_price)
stop_distance = expected_profit / 2.0
if stop_distance < (security.symbol_properties.minimum_price_variation * 4):
return
self.pending_target_price = target_price
else:
atr = self.m15_atr.current.value
stop_distance = atr * self.ATR_MULT
self.pending_target_price = 0.0
multiplier = security.symbol_properties.contract_multiplier
risk_per_contract = stop_distance * multiplier
raw_qty = int((current_equity * dynamic_risk_pct) / risk_per_contract) if risk_per_contract > 0 else 0
qty = max(min(raw_qty, 10), 1)
self.pending_side = side
self.pending_qty = qty if side == "BUY" else -qty
self.pending_stop_dist = stop_distance
self.pending_regime = self.volatility_regime
tick_size = security.symbol_properties.minimum_price_variation
raw_entry = security.bid_price if side == "BUY" else security.ask_price
entry_price = round(raw_entry / tick_size) * tick_size
self.entry_ticket = self.algo.limit_order(mapped, self.pending_qty, entry_price)
def handle_order_event(self, order_event):
if order_event.status != OrderStatus.FILLED: return
mapped = self.future.mapped
if mapped is None or order_event.symbol != mapped: return
fill_price = order_event.fill_price
fill_qty = order_event.fill_quantity
tick_size = self.algo.securities[mapped].symbol_properties.minimum_price_variation
# 1. Catch the Entry Fill
if self.entry_ticket and order_event.order_id == self.entry_ticket.order_id:
if self.pending_stop_dist <= 0:
self.reset_trade_tracking()
return
if self.pending_side == "BUY": stop_price = fill_price - self.pending_stop_dist
else: stop_price = fill_price + self.pending_stop_dist
stop_price = round(stop_price / tick_size) * tick_size
self.stop_ticket = self.algo.stop_market_order(mapped, -self.pending_qty, stop_price)
self.trade_direction = self.pending_side
self.entry_price = fill_price
self.initial_stop_dist = self.pending_stop_dist
self.entry_regime = self.pending_regime
self.active_target_price = self.pending_target_price
self.entry_ticket = None
return
# 2. UNIVERSAL LEDGER PATCH
if self.trade_direction is not None:
is_closing_fill = (self.trade_direction == "BUY" and fill_qty < 0) or \
(self.trade_direction == "SELL" and fill_qty > 0)
if is_closing_fill:
multiplier = self.algo.securities[mapped].symbol_properties.contract_multiplier
if self.trade_direction == "BUY":
points_captured = fill_price - self.entry_price
else:
points_captured = self.entry_price - fill_price
pnl = points_captured * abs(fill_qty) * multiplier
r_multiple = points_captured / self.initial_stop_dist if self.initial_stop_dist > 0 else 0
exit_type = "TRADE CLOSED"
if self.stop_ticket and order_event.order_id == self.stop_ticket.order_id:
exit_type = "STOP HIT"
self.algo.debug(f"{exit_type} [{self.entry_regime}] | {str(self.symbol)} | Dir: {self.trade_direction} | Entry: {self.entry_price:.2f} | Exit: {fill_price:.2f} | Qty: {abs(fill_qty)} | PnL: ${pnl:.2f} | R-Mult: {r_multiple:.2f}R")
if self.algo.portfolio[mapped].quantity == 0:
self.reset_trade_tracking()
def reset_trade_tracking(self):
self.trade_direction = None
self.stop_ticket = None
self.entry_ticket = None
self.entry_price = 0.0
self.initial_stop_dist = 0.0
self.entry_regime = None
self.pending_target_price = 0.0
self.active_target_price = 0.0
def handle_contract_roll(self, new_mapped):
if not self.algo.securities.contains_key(new_mapped): return
old_stop_price = None
if self.stop_ticket is not None:
try:
old_stop_price = self.stop_ticket.get(OrderField.STOP_PRICE)
self.algo.transactions.cancel_order(self.stop_ticket.order_id)
except: pass
self.stop_ticket = None
if self.entry_ticket is not None:
try:
self.algo.transactions.cancel_order(self.entry_ticket.order_id)
except: pass
self.entry_ticket = None
qty = self.algo.portfolio[new_mapped].quantity
if qty == 0:
self.algo.debug(
f"CONTRACT ROLL [{str(self.symbol)}]: No position found on "
f"{new_mapped}. Trade state reset."
)
self.reset_trade_tracking()
return
if old_stop_price is not None and self.trade_direction is not None:
tick_size = self.algo.securities[new_mapped].symbol_properties.minimum_price_variation
stop_price = round(old_stop_price / tick_size) * tick_size
self.stop_ticket = self.algo.stop_market_order(new_mapped, -qty, stop_price)
self.algo.debug(
f"CONTRACT ROLL [{str(self.symbol)}]: Stop re-placed on "
f"{new_mapped} @ {stop_price:.4f} (dir: {self.trade_direction})"
)