| Overall Statistics |
|
Total Orders 59 Average Win 4.85% Average Loss -0.64% Compounding Annual Return 9.175% Drawdown 9.600% Expectancy 2.190 Start Equity 100000 End Equity 142087.66 Net Profit 42.088% Sharpe Ratio 0.549 Sortino Ratio 0.592 Probabilistic Sharpe Ratio 27.760% Loss Rate 63% Win Rate 37% Profit-Loss Ratio 7.61 Alpha 0 Beta 0 Annual Standard Deviation 0.084 Annual Variance 0.007 Information Ratio 0.785 Tracking Error 0.084 Treynor Ratio 0 Total Fees $111.72 Estimated Strategy Capacity $1400000000.00 Lowest Capacity Asset MES YEBKSYL2454X Portfolio Turnover 2.80% Drawdown Recovery 241 |
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
from numba import jit
# ==============================================================================
# MACHINE-CODE MATH ENGINE
# ==============================================================================
@jit(nopython=True)
def jit_hawkes(timestamps, alpha=0.1, beta=0.5, baseline_mu=0.01):
n = len(timestamps)
if n < 2: return baseline_mu
intensity = np.zeros(n)
intensity[0] = baseline_mu + alpha
for i in range(1, n):
delta_t = timestamps[i] - timestamps[i-1] # ms
intensity[i] = baseline_mu + (intensity[i-1] - baseline_mu) * np.exp(-beta * delta_t) + alpha
return intensity[-1]
@jit(nopython=True)
def jit_linregress(x, y):
n = len(x)
sum_x = np.sum(x)
sum_y = np.sum(y)
sum_xx = np.sum(x * x)
sum_xy = np.sum(x * y)
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_xx - sum_x * sum_x)
return slope
@jit(nopython=True)
def jit_hurst(prices, max_lag=20):
n = len(prices)
if n < 100 or prices[0] == 0: return 0.5
returns = np.empty(n - 1)
for i in range(n - 1):
returns[i] = np.log(prices[i+1] / prices[i])
max_entries = max_lag - 2
rs_values = np.empty(max_entries)
valid_lags = np.empty(max_entries)
count = 0
for lag in range(2, max_lag):
num_chunks = len(returns) // lag
if num_chunks == 0: continue
rs_sum = 0.0
for c in range(num_chunks):
chunk = returns[c*lag : (c+1)*lag]
mean = np.mean(chunk)
cum_dev = 0.0
max_dev = 0.0
min_dev = 0.0
variance = 0.0
for val in chunk:
dev = val - mean
cum_dev += dev
if cum_dev > max_dev: max_dev = cum_dev
if cum_dev < min_dev: min_dev = cum_dev
variance += dev * dev
r = max_dev - min_dev
s = np.sqrt(variance / lag)
if s == 0: s = 1e-8
rs_sum += (r / s)
rs_values[count] = rs_sum / num_chunks
valid_lags[count] = lag
count += 1
if count == 0: return 0.5
x = np.log(valid_lags[:count])
y = np.log(rs_values[:count])
return jit_linregress(x, y)
@jit(nopython=True)
def jit_vpin(buy_vol_array, sell_vol_array, bucket_size, window=50):
n = len(buy_vol_array)
if n < window: return 0.0
raw_vpin = np.abs(buy_vol_array - sell_vol_array) / bucket_size
current_vpin = 0.0
for i in range(n - window, n):
current_vpin += raw_vpin[i]
return current_vpin / window
# ==============================================================================
# MASTER MATRIX CONTROLLER
# ==============================================================================
class MedallionCapitvioMatrixAlgo(QCAlgorithm):
def initialize(self):
self.set_start_date(2020, 1, 1)
self.set_end_date(2023, 12, 31)
self.set_cash(100000)
self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)
# ---- UNIVERSE: MES + MGC (Micro Gold) ----
basket = [
Futures.Indices.MICRO_SP_500_E_MINI, # MES
]
self.symbol_data = {}
self.symbol_to_ticker = {}
self.set_warm_up(timedelta(days=60))
for ticker in basket:
future = self.add_future(ticker, Resolution.SECOND)
future.set_filter(0, 90)
self.symbol_data[future.symbol] = SymbolData(self, future)
self.symbol_to_ticker[future.symbol] = ticker
self.CORRELATION_GROUPS = []
self.MAX_DAILY_DRAWDOWN = 0.04
self.high_water_mark = self.portfolio.total_portfolio_value
self.start_of_day_equity = self.portfolio.total_portfolio_value
self.daily_kill_switch_tripped = False
self.algo_id = "CAPITVIO_MATRIX_MES_MGC_ALL6"
self.schedule.on(self.date_rules.every_day(),
self.time_rules.at(0, 5),
self.reset_daily_metrics)
self.schedule.on(self.date_rules.month_start(),
self.time_rules.at(0, 10),
self.run_walk_forward_optimization)
def reset_daily_metrics(self):
self.start_of_day_equity = self.portfolio.total_portfolio_value
self.daily_kill_switch_tripped = False
if self.portfolio.total_portfolio_value > self.high_water_mark:
self.high_water_mark = self.portfolio.total_portfolio_value
def run_walk_forward_optimization(self):
if self.is_warming_up: return
self.debug(f"[{self.time}] Executing Monthly Walk-Forward Parameter Calibration...")
for symbol, sd in self.symbol_data.items():
sd.recalibrate_wfo_parameters()
def on_data(self, data):
if not self.daily_kill_switch_tripped:
current_drawdown = (self.start_of_day_equity - self.portfolio.total_portfolio_value) / self.start_of_day_equity
if current_drawdown >= self.MAX_DAILY_DRAWDOWN:
self.debug(f"MATRIX KILL SWITCH TRIPPED. Global Drawdown hit {current_drawdown:.2%}")
self.daily_kill_switch_tripped = True
self.liquidate()
self.transactions.cancel_open_orders()
return
if self.daily_kill_switch_tripped: return
for symbol, sd in self.symbol_data.items():
# Process deferred contract roll if new data is now available
sd.process_deferred_roll(data)
if data.bars.contains_key(symbol):
sd.process_trade_bar(data.bars[symbol])
if sd.entry_ticket is not None:
sd.check_adverse_selection()
mapped = sd.future.mapped
if mapped is not None:
if sd.last_known_mapped is not None and sd.last_known_mapped != mapped:
self.debug(f"CONTRACT ROLL [{str(symbol)}]: {sd.last_known_mapped} → {mapped}")
sd.handle_contract_roll(mapped)
sd.last_known_mapped = mapped
if mapped and self.portfolio[mapped].invested:
sd.update_trailing_stop()
sd.check_macro_decay(mapped)
def on_order_event(self, order_event):
symbol = order_event.symbol
if symbol in self.symbol_data:
self.symbol_data[symbol].handle_order_event(order_event)
# ==============================================================================
# INDEPENDENT ASSET BRAIN – Absorption/Retest Model (All 6 Improvements)
# ==============================================================================
class SymbolData:
def __init__(self, algo, future):
self.algo = algo
self.symbol = future.symbol
self.future = future
self.hourly = TradeBarConsolidator(timedelta(minutes=60))
self.hourly.data_consolidated += self.on_hourly_bar
self.algo.subscription_manager.add_consolidator(self.symbol, self.hourly)
self.h_std = StandardDeviation(20); self.h_atr = AverageTrueRange(20)
self.algo.register_indicator(self.symbol, self.h_std, self.hourly)
self.algo.register_indicator(self.symbol, self.h_atr, self.hourly)
self.hourly_closes = RollingWindow[float](250)
self.current_hurst = 0.5
self.HURST_THRESHOLD = 0.50
self.emac_spans = [4, 8, 16]; self.slow_spans = [16, 32, 64]
self.ema_dict = {}
for span in (self.emac_spans + self.slow_spans):
self.ema_dict[span] = ExponentialMovingAverage(span)
self.algo.register_indicator(self.symbol, self.ema_dict[span], self.hourly)
self.FORECAST_SCALAR_BY_SPAN = {4: 8.53, 8: 5.95, 16: 4.10}
self.ABS_FORECAST_CAP = 20
self.macro_bias = 0
self.volatility_regime = "WARMUP"
self.continuous_forecast = 0.0
# Original 5.0 gate
self.FORECAST_ENTRY_THRESHOLD = 5.0
self.m15 = TradeBarConsolidator(timedelta(minutes=15))
self.m15.data_consolidated += self.on_m15_bar
self.algo.subscription_manager.add_consolidator(self.symbol, self.m15)
self.m15_atr = AverageTrueRange(14)
self.algo.register_indicator(self.symbol, self.m15_atr, self.m15)
self.last_m15_low = 0.0; self.last_m15_high = 0.0
self.m5 = TradeBarConsolidator(timedelta(minutes=5))
self.m5.data_consolidated += self.on_m5_bar
self.algo.subscription_manager.add_consolidator(self.symbol, self.m5)
self.m5_atr = AverageTrueRange(14)
self.algo.register_indicator(self.symbol, self.m5_atr, self.m5)
self.m5_bb = BollingerBands(20, 2.0)
self.algo.register_indicator(self.symbol, self.m5_bb, self.m5)
self.vol_window = RollingWindow[float](50)
self.current_m5_delta = 0; self.last_trade_price = 0.0
self.delta_window = RollingWindow[float](5)
self.vpin_bucket_size = 5000
self.vpin_buy_window = RollingWindow[float](50); self.vpin_sell_window = RollingWindow[float](50)
self.current_bucket_buy_vol = 0; self.current_bucket_sell_vol = 0
self.is_market_toxic = False
# Hawkes (milliseconds)
self.tick_times_ms = RollingWindow[float](100)
self.wfo_historical_vpin = RollingWindow[float](720)
self.wfo_historical_hawkes = RollingWindow[float](720)
self.wfo_historical_delta = RollingWindow[float](720)
self.wfo_historical_atr = RollingWindow[float](720)
self.baseline_atr = 0.0
# WFO original percentiles (70/85/70)
self.vpin_toxicity_threshold = 0.75
self.HAWKES_EXCITATION_THRESHOLD = 5.0
self.DYNAMIC_DELTA_THRESHOLD = 150.0
# Absorption/Retest state
self.absorption_zone = None; self.displaced = False
self.absorption_bars_since = 0 # Change 1: time-decay counter
self.entry_ticket = None; self.stop_ticket = None
self.baseline_bid_liquidity = 0
self.pending_side = None; self.pending_qty = 0; self.pending_stop_dist = 0.0
self.trade_direction = None; self.entry_price = 0.0
self.last_known_mapped = None
self.pending_regime = None
self.entry_regime = None
self.pending_target_price = 0.0
self.active_target_price = 0.0
self.initial_stop_dist = 0.0
self.FORECAST_DECAY_THRESHOLD = 4.0
self.BASE_RISK_PCT = 0.004
self.ATR_MULT = 1.75
self.DISPLACEMENT_MULT = 0.5 # kept but not used for new logic
self.RETEST_VOL_MAX = 0.45 # Change 3: tightened retest volume
# Change 5: dynamic body filter – rolling window of 5‑min candle bodies
self.m5_body_window = RollingWindow[float](20)
# Contract roll
self.roll_in_progress = False
self.roll_new_mapped = None
self.roll_old_quantity = 0
self.deferred_roll = False
self.deferred_roll_new_mapped = None
self.deferred_roll_old_quantity = 0
self.deferred_roll_side = None
self.deferred_roll_stop_dist = 0.0
def calculate_vwmp(self, best_bid_price, best_bid_vol, best_ask_price, best_ask_vol):
total_vol = best_bid_vol + best_ask_vol
if total_vol == 0:
return (best_bid_price + best_ask_price) / 2 if best_bid_price > 0 else 0
return best_bid_price * (best_ask_vol / total_vol) + best_ask_price * (best_bid_vol / total_vol)
def recalibrate_wfo_parameters(self):
if not self.wfo_historical_vpin.is_ready or not self.wfo_historical_hawkes.is_ready or not self.wfo_historical_delta.is_ready:
return
vpin_array = np.array(list(self.wfo_historical_vpin), dtype=np.float64)
hawkes_array = np.array(list(self.wfo_historical_hawkes), dtype=np.float64)
delta_array = np.array(list(self.wfo_historical_delta), dtype=np.float64)
new_vpin_thresh = np.percentile(vpin_array, 70)
new_hawkes_thresh = np.percentile(hawkes_array, 85)
new_delta_thresh = np.percentile(delta_array, 70)
self.vpin_toxicity_threshold = max(0.15, new_vpin_thresh)
self.HAWKES_EXCITATION_THRESHOLD = max(0.50, new_hawkes_thresh)
self.DYNAMIC_DELTA_THRESHOLD = max(10.0, new_delta_thresh)
if self.wfo_historical_atr.is_ready:
atr_array = np.array(list(self.wfo_historical_atr), dtype=np.float64)
self.baseline_atr = np.median(atr_array)
self.algo.debug(f"WFO [{str(self.symbol)}]: VPIN: {self.vpin_toxicity_threshold:.2f} | Delta: {self.DYNAMIC_DELTA_THRESHOLD:.0f} | Base Vol: {self.baseline_atr:.2f}")
def process_trade_bar(self, bar):
self.tick_times_ms.add(bar.end_time.timestamp() * 1000) # milliseconds
if bar.close > self.last_trade_price:
self.current_m5_delta += bar.volume
self.current_bucket_buy_vol += bar.volume
elif bar.close < self.last_trade_price:
self.current_m5_delta -= bar.volume
self.current_bucket_sell_vol += bar.volume
else:
self.current_bucket_buy_vol += bar.volume / 2
self.current_bucket_sell_vol += bar.volume / 2
self.last_trade_price = bar.close
if (self.current_bucket_buy_vol + self.current_bucket_sell_vol) >= self.vpin_bucket_size:
self.vpin_buy_window.add(self.current_bucket_buy_vol)
self.vpin_sell_window.add(self.current_bucket_sell_vol)
self.current_bucket_buy_vol = 0
self.current_bucket_sell_vol = 0
if self.vpin_buy_window.is_ready:
buy_vol_array = np.array(list(self.vpin_buy_window)[::-1], dtype=np.float64)
sell_vol_array = np.array(list(self.vpin_sell_window)[::-1], dtype=np.float64)
self.is_market_toxic = jit_vpin(buy_vol_array, sell_vol_array, float(self.vpin_bucket_size)) > self.vpin_toxicity_threshold
def on_hourly_bar(self, sender, bar):
self.hourly_closes.add(bar.close)
if self.h_atr.is_ready:
self.wfo_historical_atr.add(self.h_atr.current.value)
if self.vpin_buy_window.is_ready:
buy_vol_array = np.array(list(self.vpin_buy_window)[::-1], dtype=np.float64)
sell_vol_array = np.array(list(self.vpin_sell_window)[::-1], dtype=np.float64)
current_vpin = jit_vpin(buy_vol_array, sell_vol_array, float(self.vpin_bucket_size))
self.wfo_historical_vpin.add(current_vpin)
if self.tick_times_ms.is_ready:
timestamps = np.array(list(self.tick_times_ms)[::-1], dtype=np.float64)
self.wfo_historical_hawkes.add(jit_hawkes(timestamps))
if not self.hourly_closes.is_ready or not self.h_std.is_ready or not self.h_atr.is_ready: return
for ema in self.ema_dict.values():
if not ema.is_ready: return
prices_array = np.array(list(self.hourly_closes)[::-1], dtype=np.float64)
self.current_hurst = jit_hurst(prices_array)
capped_forecast_by_span = {}
hourly_risk_price_terms = self.h_atr.current.value
if hourly_risk_price_terms == 0: return
for i, fast_span in enumerate(self.emac_spans):
slow_span = self.slow_spans[i]
fast_ema = self.ema_dict[fast_span].current.value
slow_ema = self.ema_dict[slow_span].current.value
risk_adjusted_ewmac = (fast_ema - slow_ema) / hourly_risk_price_terms
scaled_forecast = risk_adjusted_ewmac * self.FORECAST_SCALAR_BY_SPAN[fast_span]
capped_forecast_by_span[fast_span] = max(min(scaled_forecast, self.ABS_FORECAST_CAP), -self.ABS_FORECAST_CAP)
raw_combined_forecast = sum(capped_forecast_by_span.values()) / len(capped_forecast_by_span)
scaled_combined_forecast = raw_combined_forecast * 1.08
self.continuous_forecast = max(min(scaled_combined_forecast, self.ABS_FORECAST_CAP), -self.ABS_FORECAST_CAP)
# --- TREND / CHOP ---
if self.current_hurst >= self.HURST_THRESHOLD:
self.volatility_regime = "TREND"
if self.continuous_forecast > self.FORECAST_ENTRY_THRESHOLD:
self.macro_bias = 1
elif self.continuous_forecast < -self.FORECAST_ENTRY_THRESHOLD:
self.macro_bias = -1
else:
self.macro_bias = 0
else:
self.volatility_regime = "CHOP"
self.macro_bias = 0
def on_m15_bar(self, sender, bar):
self.last_m15_low = bar.low
self.last_m15_high = bar.high
def on_m5_bar(self, sender, bar):
self.vol_window.add(bar.volume)
# Change 5: track candle body
self.m5_body_window.add(abs(bar.close - bar.open))
if self.current_m5_delta != 0:
self.wfo_historical_delta.add(abs(self.current_m5_delta))
self.delta_window.add(self.current_m5_delta)
self.current_m5_delta = 0
if self.algo.is_warming_up: return
if not self.vol_window.is_ready or not self.m5_atr.is_ready or not self.m5_bb.is_ready: return
# CHOP mean reversion
if self.volatility_regime == "CHOP":
band_tolerance = self.m5_atr.current.value * 0.5
if bar.close <= (self.m5_bb.lower_band.current.value + band_tolerance):
self.macro_bias = 1
elif bar.close >= (self.m5_bb.upper_band.current.value - band_tolerance):
self.macro_bias = -1
else:
self.macro_bias = 0
if self.macro_bias != 0:
self.execute_shadow_entry("BUY" if self.macro_bias == 1 else "SELL")
# TREND absorption/retest (all improvements)
elif self.volatility_regime == "TREND":
if self.macro_bias == 0:
if self.absorption_zone is not None:
self.absorption_zone = None
self.displaced = False
self.absorption_bars_since = 0
return
# ---- Time‑decay on active zone (before displacement) ----
if self.absorption_zone is not None and not self.displaced:
self.absorption_bars_since += 1
if self.absorption_bars_since > 10:
self.absorption_zone = None
self.displaced = False
self.absorption_bars_since = 0
# Proceed with detection logic
if self.absorption_zone is None:
self.detect_absorption(bar)
elif not self.displaced:
self.detect_displacement(bar)
else:
self.detect_retest(bar)
# ---------- Absorption / Retest (with all changes) ----------
def detect_absorption(self, bar):
vols = [x for x in self.vol_window][1:]
mean_vol = np.mean(vols)
std_vol = np.std(vols)
if mean_vol == 0 or std_vol == 0: return
z_score = (bar.volume - mean_vol) / std_vol
candle_body = abs(bar.close - bar.open)
current_atr = self.m5_atr.current.value
vol_cv = std_vol / mean_vol
raw_dynamic_z = 1.0 + (vol_cv * 2.0)
dynamic_z_threshold = max(1.2, min(raw_dynamic_z, 2.5))
# Change 5: dynamic body filter using average body size
avg_body = current_atr * 0.3 # fallback
if self.m5_body_window.is_ready:
avg_body = np.mean(list(self.m5_body_window))
if candle_body < avg_body * 1.2: # slightly wider than average
if z_score > dynamic_z_threshold:
self.absorption_zone = bar
self.displaced = False
self.absorption_bars_since = 0
def detect_displacement(self, bar):
# Change 6: displacement volume must be at least 50% of absorption volume
if self.absorption_zone is None: return
if bar.volume < self.absorption_zone.volume * 0.5:
return
# Change 2: displacement requires close outside the absorption zone's range
if self.macro_bias == 1: # bullish break
if bar.close > self.absorption_zone.high:
self.displaced = True
else: # bearish break
if bar.close < self.absorption_zone.low:
self.displaced = True
def detect_retest(self, bar):
# Change 3: stricter volume limit
limit = self.absorption_zone.volume * self.RETEST_VOL_MAX
in_zone = (bar.low <= self.absorption_zone.high and bar.high >= self.absorption_zone.low)
if not in_zone or bar.volume >= limit:
return
# Change 4: rejection clause
if self.macro_bias == 1: # long entry
# Rejection: low touches the zone, but close > open (bullish)
if bar.close <= bar.open:
return
else: # short entry
# Rejection: high touches the zone, but close < open (bearish)
if bar.close >= bar.open:
return
# All conditions met – execute
self.execute_shadow_entry("BUY" if self.macro_bias == 1 else "SELL")
self.absorption_zone = None
self.displaced = False
self.absorption_bars_since = 0
# ---------- Execution ----------
def check_adverse_selection(self):
if self.entry_ticket is None: return
mapped = self.future.mapped
if mapped is None or not self.algo.securities.contains_key(mapped): return
security = self.algo.securities[mapped]
limit_price = self.entry_ticket.get(OrderField.LIMIT_PRICE)
tick_size = security.symbol_properties.minimum_price_variation
if self.pending_side == "BUY":
if abs(security.bid_price - limit_price) <= (tick_size * 3):
if self.baseline_bid_liquidity == 0:
self.baseline_bid_liquidity = security.bid_size
if security.bid_size < (self.baseline_bid_liquidity * 0.30) and security.bid_size > 0:
self.algo.transactions.cancel_order(self.entry_ticket.order_id)
self.entry_ticket = None
self.baseline_bid_liquidity = 0
return
elif self.pending_side == "SELL":
if abs(security.ask_price - limit_price) <= (tick_size * 3):
if self.baseline_bid_liquidity == 0:
self.baseline_bid_liquidity = security.ask_size
if security.ask_size < (self.baseline_bid_liquidity * 0.30) and security.ask_size > 0:
self.algo.transactions.cancel_order(self.entry_ticket.order_id)
self.entry_ticket = None
self.baseline_bid_liquidity = 0
return
def update_trailing_stop(self):
if self.entry_regime != "TREND": return
mapped = self.future.mapped
if mapped is None or not self.algo.securities.contains_key(mapped) or self.stop_ticket is None: return
if self.last_m15_low == 0.0 or self.last_m15_high == 0.0 or not self.m15_atr.is_ready: return
atr = self.m15_atr.current.value
tick_size = self.algo.securities[mapped].symbol_properties.minimum_price_variation
if self.trade_direction == "BUY":
new_stop_raw = self.last_m15_low - (atr * self.ATR_MULT)
current_stop = self.stop_ticket.get(OrderField.STOP_PRICE)
if new_stop_raw > current_stop:
self.move_stop(new_stop_raw, tick_size)
elif self.trade_direction == "SELL":
new_stop_raw = self.last_m15_high + (atr * self.ATR_MULT)
current_stop = self.stop_ticket.get(OrderField.STOP_PRICE)
if new_stop_raw < current_stop:
self.move_stop(new_stop_raw, tick_size)
def move_stop(self, raw_price, tick_size):
update_fields = UpdateOrderFields()
rounded_price = round(raw_price / tick_size) * tick_size
update_fields.stop_price = rounded_price
self.stop_ticket.update(update_fields)
def check_macro_decay(self, mapped):
if self.trade_direction is None or self.entry_price == 0.0: return
security = self.algo.securities[mapped]
current_qty = self.algo.portfolio[mapped].quantity
if current_qty == 0: return
decay = False
exit_reason = ""
if self.entry_regime == "TREND":
if (self.trade_direction == "BUY" and self.continuous_forecast < self.FORECAST_DECAY_THRESHOLD) or \
(self.trade_direction == "SELL" and self.continuous_forecast > -self.FORECAST_DECAY_THRESHOLD):
decay = True
exit_reason = f"TREND DECAY EXIT: Forecast={self.continuous_forecast:.2f}"
elif self.entry_regime == "CHOP" and self.active_target_price > 0:
if (self.trade_direction == "BUY" and security.price >= self.active_target_price) or \
(self.trade_direction == "SELL" and security.price <= self.active_target_price):
decay = True
exit_reason = f"MEAN REVERSION TARGET HIT (1:2 RR): Target={self.active_target_price:.4f}"
if decay:
self.algo.market_order(mapped, -current_qty)
if self.stop_ticket is not None:
self.algo.transactions.cancel_order(self.stop_ticket.order_id)
self.algo.debug(f"[{str(self.symbol)}] {exit_reason}. Closing {abs(int(current_qty))} contracts.")
def execute_shadow_entry(self, side):
if self.algo.portfolio[self.symbol].invested or self.entry_ticket is not None: return
if self.is_market_toxic: return
if not self.delta_window.is_ready: return
recent_delta = sum(list(self.delta_window)[:3])
if side == "BUY" and recent_delta < (self.DYNAMIC_DELTA_THRESHOLD * 0.6): return
if side == "SELL" and recent_delta > -(self.DYNAMIC_DELTA_THRESHOLD * 0.6): return
if not self.tick_times_ms.is_ready: return
timestamps = np.array(list(self.tick_times_ms)[::-1], dtype=np.float64)
current_heat = jit_hawkes(timestamps)
if current_heat > (self.HAWKES_EXCITATION_THRESHOLD * 1.5): return
mapped = self.future.mapped
if mapped is None or not self.algo.securities.contains_key(mapped) or not self.m15_atr.is_ready: return
security = self.algo.securities[mapped]
vwmp_price = self.calculate_vwmp(security.bid_price, security.bid_size, security.ask_price, security.ask_size)
if vwmp_price == 0: return
current_equity = self.algo.portfolio.total_portfolio_value
dynamic_risk_pct = self.BASE_RISK_PCT
if self.baseline_atr > 0 and self.h_atr.is_ready:
current_hourly_atr = self.h_atr.current.value
if current_hourly_atr > 0:
vol_scalar = self.baseline_atr / current_hourly_atr
vol_scalar = max(0.25, min(vol_scalar, 1.5))
dynamic_risk_pct = self.BASE_RISK_PCT * vol_scalar
if self.volatility_regime == "CHOP":
target_price = self.m5_bb.middle_band.current.value
expected_profit = abs(vwmp_price - target_price)
stop_distance = expected_profit / 2.0
if stop_distance < (security.symbol_properties.minimum_price_variation * 4):
return
self.pending_target_price = target_price
else:
atr = self.m15_atr.current.value
stop_distance = atr * self.ATR_MULT
self.pending_target_price = 0.0
multiplier = security.symbol_properties.contract_multiplier
risk_per_contract = stop_distance * multiplier
raw_qty = int((current_equity * dynamic_risk_pct) / risk_per_contract) if risk_per_contract > 0 else 0
qty = max(min(raw_qty, 10), 1)
self.pending_side = side
self.pending_qty = qty if side == "BUY" else -qty
self.pending_stop_dist = stop_distance
self.pending_regime = self.volatility_regime
tick_size = security.symbol_properties.minimum_price_variation
raw_entry = security.bid_price if side == "BUY" else security.ask_price
entry_price = round(raw_entry / tick_size) * tick_size
self.entry_ticket = self.algo.limit_order(mapped, self.pending_qty, entry_price)
def handle_order_event(self, order_event):
if order_event.status != OrderStatus.FILLED: return
mapped = self.future.mapped
if mapped is None or order_event.symbol != mapped: return
fill_price = order_event.fill_price
fill_qty = order_event.fill_quantity
tick_size = self.algo.securities[mapped].symbol_properties.minimum_price_variation
# Roll opening fill
if self.roll_in_progress and order_event.symbol == self.roll_new_mapped and fill_qty == self.roll_old_quantity:
if self.pending_side == "BUY":
stop_price = fill_price - self.pending_stop_dist
else:
stop_price = fill_price + self.pending_stop_dist
stop_price = round(stop_price / tick_size) * tick_size
self.stop_ticket = self.algo.stop_market_order(self.roll_new_mapped, -fill_qty, stop_price)
self.entry_price = fill_price
self.roll_in_progress = False
self.roll_new_mapped = None
self.roll_old_quantity = 0
return
# Entry fill
if self.entry_ticket and order_event.order_id == self.entry_ticket.order_id:
if self.pending_stop_dist <= 0:
self.reset_trade_tracking()
return
if self.pending_side == "BUY": stop_price = fill_price - self.pending_stop_dist
else: stop_price = fill_price + self.pending_stop_dist
stop_price = round(stop_price / tick_size) * tick_size
self.stop_ticket = self.algo.stop_market_order(mapped, -self.pending_qty, stop_price)
self.trade_direction = self.pending_side
self.entry_price = fill_price
self.initial_stop_dist = self.pending_stop_dist
self.entry_regime = self.pending_regime
self.active_target_price = self.pending_target_price
self.entry_ticket = None
return
# Closing fill
if self.trade_direction is not None:
is_closing_fill = (self.trade_direction == "BUY" and fill_qty < 0) or \
(self.trade_direction == "SELL" and fill_qty > 0)
if is_closing_fill:
multiplier = self.algo.securities[mapped].symbol_properties.contract_multiplier
if self.trade_direction == "BUY":
points_captured = fill_price - self.entry_price
else:
points_captured = self.entry_price - fill_price
pnl = points_captured * abs(fill_qty) * multiplier
r_multiple = points_captured / self.initial_stop_dist if self.initial_stop_dist > 0 else 0
exit_type = "STOP HIT" if (self.stop_ticket and order_event.order_id == self.stop_ticket.order_id) else "TRADE CLOSED"
self.algo.debug(f"{exit_type} [{self.entry_regime}] | {str(self.symbol)} | Dir: {self.trade_direction} | Entry: {self.entry_price:.2f} | Exit: {fill_price:.2f} | Qty: {abs(fill_qty)} | PnL: ${pnl:.2f} | R-Mult: {r_multiple:.2f}R")
if self.algo.portfolio[mapped].quantity == 0:
self.reset_trade_tracking()
def reset_trade_tracking(self):
self.trade_direction = None
self.stop_ticket = None
self.entry_ticket = None
self.entry_price = 0.0
self.initial_stop_dist = 0.0
self.entry_regime = None
self.pending_target_price = 0.0
self.active_target_price = 0.0
self.roll_in_progress = False
self.roll_new_mapped = None
self.roll_old_quantity = 0
self.deferred_roll = False
self.deferred_roll_new_mapped = None
self.deferred_roll_old_quantity = 0
self.deferred_roll_side = None
self.deferred_roll_stop_dist = 0.0
self.absorption_bars_since = 0
# ---------- Deferred contract roll ----------
def process_deferred_roll(self, data):
if not self.deferred_roll: return
new_mapped = self.deferred_roll_new_mapped
if new_mapped is None or not data.bars.contains_key(new_mapped): return
old_mapped = self.last_known_mapped
if old_mapped is None:
self.reset_trade_tracking()
return
self.algo.debug(f"EXECUTING DEFERRED ROLL [{str(self.symbol)}]: transferring {self.deferred_roll_old_quantity} to {new_mapped}")
old_qty = self.deferred_roll_old_quantity
if old_qty != 0:
self.algo.market_order(old_mapped, -old_qty)
self.algo.market_order(new_mapped, old_qty)
self.roll_in_progress = True
self.roll_new_mapped = new_mapped
self.roll_old_quantity = old_qty
self.pending_side = self.deferred_roll_side
self.pending_stop_dist = self.deferred_roll_stop_dist
self.pending_regime = self.entry_regime
self.pending_target_price = self.active_target_price
self.deferred_roll = False
self.deferred_roll_new_mapped = None
self.deferred_roll_old_quantity = 0
self.deferred_roll_side = None
self.deferred_roll_stop_dist = 0.0
def handle_contract_roll(self, new_mapped):
if not self.algo.securities.contains_key(new_mapped): return
old_mapped = self.last_known_mapped
if old_mapped is None or not self.algo.securities.contains_key(old_mapped):
self.reset_trade_tracking()
return
if not self.algo.securities[new_mapped].has_data:
self.algo.debug(f"CONTRACT ROLL [{str(self.symbol)}]: New contract {new_mapped} has no data yet. Deferring.")
if self.entry_ticket is not None:
self.algo.transactions.cancel_order(self.entry_ticket.order_id)
self.entry_ticket = None
if self.stop_ticket is not None:
self.algo.transactions.cancel_order(self.stop_ticket.order_id)
self.stop_ticket = None
old_quantity = self.algo.portfolio[old_mapped].quantity
if old_quantity != 0:
self.deferred_roll = True
self.deferred_roll_new_mapped = new_mapped
self.deferred_roll_old_quantity = old_quantity
self.deferred_roll_side = "BUY" if old_quantity > 0 else "SELL"
self.deferred_roll_stop_dist = self.initial_stop_dist
return
if self.entry_ticket is not None:
self.algo.transactions.cancel_order(self.entry_ticket.order_id)
self.entry_ticket = None
if self.stop_ticket is not None:
self.algo.transactions.cancel_order(self.stop_ticket.order_id)
self.stop_ticket = None
old_quantity = self.algo.portfolio[old_mapped].quantity
if old_quantity == 0:
self.reset_trade_tracking()
return
side = "BUY" if old_quantity > 0 else "SELL"
stop_dist = self.initial_stop_dist
self.algo.market_order(old_mapped, -old_quantity)
self.algo.market_order(new_mapped, old_quantity)
self.roll_in_progress = True
self.roll_new_mapped = new_mapped
self.roll_old_quantity = old_quantity
self.pending_side = side
self.pending_stop_dist = stop_dist
self.pending_regime = self.entry_regime
self.pending_target_price = self.active_target_price
self.algo.debug(f"CONTRACT ROLL [{str(self.symbol)}]: transferred {old_quantity} contracts to {new_mapped}. Awaiting fill to re‑place stop.")from AlgorithmImports import *
class MedallionRegimeMasterAlgo(QCAlgorithm):
def initialize(self):
self.set_start_date(2023, 1, 1)
self.set_end_date(2024, 1, 1)
self.set_cash(50000)
self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)
self.mnq = self.add_future(Futures.Indices.MICRO_NASDAQ_100_E_MINI,
resolution=Resolution.MINUTE,
data_normalization_mode=DataNormalizationMode.RAW,
data_mapping_mode=DataMappingMode.OPEN_INTEREST,
contract_depth_offset=0)
self.mnq.set_filter(timedelta(0), timedelta(90))
self.my_adx = AverageDirectionalIndex(14)
self.my_atr = AverageTrueRange(14, MovingAverageType.SIMPLE)
self.my_keltner = KeltnerChannels(20, 1.5, MovingAverageType.EXPONENTIAL)
self.ema_200 = ExponentialMovingAverage(200)
self.set_warm_up(200, Resolution.HOUR)
self.atr_window = RollingWindow[float](50)
self.my_atr.updated += self.on_atr_updated
self.stop_loss_ticket = None
self.take_profit_ticket = None
self.daily_start_equity = self.portfolio.total_portfolio_value
self.daily_loss_limit_pct = 2.0
self.trading_halted_today = False
self.daily_trades_taken = 0
self.max_trades_per_day = 3
self.schedule.on(self.date_rules.every_day(),
self.time_rules.midnight,
self.reset_daily_limits)
self.settings.seed_initial_prices = True
self.debug(f"Algorithm initialized at {self.time}")
def on_atr_updated(self, sender, updated):
if self.my_atr.is_ready:
self.atr_window.add(updated.value)
def reset_daily_limits(self):
self.daily_start_equity = self.portfolio.total_portfolio_value
self.trading_halted_today = False
self.daily_trades_taken = 0
def on_data(self, data):
if self.is_warming_up:
return
mapped = self.mnq.mapped
if mapped is None or not data.contains_key(mapped):
return
bar = data[mapped]
if self.time.minute == 0:
self.my_adx.update(bar)
self.my_atr.update(bar)
self.my_keltner.update(bar)
self.ema_200.update(self.time, bar.close)
if self.time.minute != 0:
return
if not self._indicators_ready():
return
if self.trading_halted_today:
return
if self._check_daily_loss_limit():
return
if not self._is_trading_session():
return
if self.portfolio.invested or self.daily_trades_taken >= self.max_trades_per_day:
return
current_price = bar.close
regime = self.determine_regime(current_price)
if self.time.hour % 2 == 0:
self.debug(f"[{self.time}] Regime: {regime} | ADX: {self.my_adx.current.value:.1f} | Price: {current_price:.2f} | EMA200: {self.ema_200.current.value:.2f}")
if regime == "High_Vol_Trend":
self.run_high_vol_trend_logic(current_price, mapped)
elif regime == "Low_Vol_Trend":
self.run_low_vol_trend_logic(current_price, mapped)
def determine_regime(self, current_price):
if current_price <= 0:
return "Range_Regime"
normalized_atr = (self.my_atr.current.value / current_price) * 100
atr_list = [x for x in self.atr_window]
if len(atr_list) < 2:
return "Range_Regime"
vol_threshold = sorted(atr_list)[len(atr_list) // 2]
is_trending = self.my_adx.current.value > 20
is_high_vol = normalized_atr > vol_threshold
if is_trending and is_high_vol:
return "High_Vol_Trend"
if is_trending and not is_high_vol:
return "Low_Vol_Trend"
return "Range_Regime"
def run_high_vol_trend_logic(self, price, symbol):
upper = self.my_keltner.upper_band.current.value
ema = self.ema_200.current.value
if price >= upper and price > ema:
stop_price = price - (self.my_atr.current.value * 1.15)
target_price = price + (self.my_atr.current.value * 4.5)
size = self.calculate_volatility_adjusted_size(symbol, price, stop_price)
if size > 0:
self.execute_trade_with_bracket(symbol, size, stop_price, target_price, "PROD: High-Vol Long")
def run_low_vol_trend_logic(self, price, symbol):
keltner_range = self.my_keltner.upper_band.current.value - self.my_keltner.lower_band.current.value
dip_threshold = self.my_keltner.middle_band.current.value + (keltner_range * 0.2)
ema = self.ema_200.current.value
if price <= dip_threshold and price > ema:
stop_price = price - (self.my_atr.current.value * 1.0)
target_price = price + (self.my_atr.current.value * 3.5)
size = self.calculate_volatility_adjusted_size(symbol, price, stop_price)
if size > 0:
self.execute_trade_with_bracket(symbol, size, stop_price, target_price, "PROD: Low-Vol Long Dip")
def execute_trade_with_bracket(self, symbol, size, stop_price, target_price, tag):
self.market_order(symbol, size, tag=tag)
self.stop_loss_ticket = self.stop_market_order(symbol, -size, stop_price, tag=f"{tag}_SL")
self.take_profit_ticket = self.limit_order(symbol, -size, target_price, tag=f"{tag}_TP")
self.daily_trades_taken += 1
self.debug(f"[{self.time}] {tag} | Size: {size} | Stop: {stop_price:.2f} | Target: {target_price:.2f}")
def on_order_event(self, order_event):
if order_event.status == OrderStatus.FILLED:
if self.take_profit_ticket is not None and order_event.order_id == self.take_profit_ticket.order_id:
if self.stop_loss_ticket is not None:
self.stop_loss_ticket.cancel("Target reached.")
self.take_profit_ticket = None
self.stop_loss_ticket = None
elif self.stop_loss_ticket is not None and order_event.order_id == self.stop_loss_ticket.order_id:
if self.take_profit_ticket is not None:
self.take_profit_ticket.cancel("Stopped out.")
self.take_profit_ticket = None
self.stop_loss_ticket = None
def calculate_volatility_adjusted_size(self, symbol, entry_price, stop_loss_price, risk_pct=1.0):
account_equity = self.portfolio.total_portfolio_value
capital_to_risk = account_equity * (risk_pct / 100.0)
trade_risk_in_points = abs(entry_price - stop_loss_price)
if trade_risk_in_points == 0:
return 0
notional_risk_per_contract = trade_risk_in_points * 2
raw_quantity = int(capital_to_risk / notional_risk_per_contract)
free_margin = self.portfolio.margin_remaining
margin_model = self.securities[symbol].buying_power_model
margin_params = InitialMarginParameters(self.securities[symbol], 1)
margin_per_contract = margin_model.get_initial_margin_requirement(margin_params).value
if margin_per_contract == 0:
margin_per_contract = 1500.0
max_contracts_allowed = int(free_margin / margin_per_contract)
final_quantity = min(raw_quantity, max_contracts_allowed)
return max(0, final_quantity)
def _indicators_ready(self):
return (self.atr_window.is_ready and
self.ema_200.is_ready and
self.my_adx.is_ready and
self.my_keltner.is_ready)
def _check_daily_loss_limit(self):
current_equity = self.portfolio.total_portfolio_value
if self.daily_start_equity == 0:
return False
daily_return_pct = ((current_equity - self.daily_start_equity) / self.daily_start_equity) * 100
if daily_return_pct <= -self.daily_loss_limit_pct:
self.liquidate()
self.trading_halted_today = True
self.debug(f"[{self.time}] Daily loss limit hit: {daily_return_pct:.2f}%")
return True
return False
def _is_trading_session(self):
if self.time.hour < 9 or self.time.hour >= 16:
return False
if self.time.hour == 9 and self.time.minute < 30:
return False
return True