| Overall Statistics |
|
Total Orders 120 Average Win 0.98% Average Loss -0.13% Compounding Annual Return 2.033% Drawdown 4.900% Expectancy 1.099 Start Equity 100000 End Equity 108387.98 Net Profit 8.388% Sharpe Ratio -0.222 Sortino Ratio -0.107 Probabilistic Sharpe Ratio 14.331% Loss Rate 75% Win Rate 25% Profit-Loss Ratio 7.55 Alpha 0 Beta 0 Annual Standard Deviation 0.025 Annual Variance 0.001 Information Ratio 0.571 Tracking Error 0.025 Treynor Ratio 0 Total Fees $277.02 Estimated Strategy Capacity $24000000.00 Lowest Capacity Asset MES YEBKSYL2454X Portfolio Turnover 6.27% Drawdown Recovery 340 |
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
from numba import jit
# ==============================================================================
# JIT Hurst (original, proven) + JIT Hawkes
# ==============================================================================
@jit(nopython=True)
def jit_linregress(x, y):
n = len(x)
sum_x = np.sum(x)
sum_y = np.sum(y)
sum_xx = np.sum(x * x)
sum_xy = np.sum(x * y)
return (n * sum_xy - sum_x * sum_y) / (n * sum_xx - sum_x * sum_x)
@jit(nopython=True)
def jit_hurst(prices, max_lag=20):
n = len(prices)
if n < 100 or prices[0] == 0:
return 0.5
returns = np.log(prices[1:] / prices[:-1])
max_entries = max_lag - 2
rs_values = np.empty(max_entries)
valid_lags = np.empty(max_entries)
count = 0
for lag in range(2, max_lag):
num_chunks = len(returns) // lag
if num_chunks == 0: continue
rs_sum = 0.0
for c in range(num_chunks):
chunk = returns[c*lag:(c+1)*lag]
mean_val = np.mean(chunk)
cum_dev = 0.0
max_dev = 0.0
min_dev = 0.0
variance = 0.0
for val in chunk:
dev = val - mean_val
cum_dev += dev
if cum_dev > max_dev: max_dev = cum_dev
if cum_dev < min_dev: min_dev = cum_dev
variance += dev * dev
r = max_dev - min_dev
s = np.sqrt(variance / lag)
if s == 0: s = 1e-8
rs_sum += (r / s)
rs_values[count] = rs_sum / num_chunks
valid_lags[count] = lag
count += 1
if count == 0: return 0.5
x = np.log(valid_lags[:count])
y = np.log(rs_values[:count])
return jit_linregress(x, y)
@jit(nopython=True)
def jit_hawkes(timestamps, alpha=0.1, beta=0.5, baseline_mu=0.01):
n = len(timestamps)
if n < 2: return baseline_mu
intensity = np.zeros(n)
intensity[0] = baseline_mu + alpha
for i in range(1, n):
delta_t = timestamps[i] - timestamps[i-1] # ms
intensity[i] = baseline_mu + (intensity[i-1] - baseline_mu) * np.exp(-beta * delta_t) + alpha
return intensity[-1]
# ==============================================================================
# MASTER MATRIX – MES + MGC (duck‑typed, JIT Hurst, trend‑only)
# ==============================================================================
class MedallionCapitvioMatrixAlgo(QCAlgorithm):
def initialize(self):
self.set_start_date(2020, 1, 1)
self.set_end_date(2023, 12, 31)
self.set_cash(100000)
self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)
self.set_warm_up(timedelta(days=60))
# Universe
self._futures = [
self.add_future(Futures.Indices.MICRO_SP_500_E_MINI, Resolution.SECOND)
]
for f in self._futures:
f.set_filter(0, 90)
self._duck_type_future(f)
# Global risk
self._max_daily_drawdown = 0.04
self._high_water_mark = self.portfolio.total_portfolio_value
self._start_of_day_equity = self.portfolio.total_portfolio_value
self._daily_kill_switch_tripped = False
self.schedule.on(self.date_rules.every_day(),
self.time_rules.at(0, 5),
self._reset_daily_metrics)
self.schedule.on(self.date_rules.month_start(),
self.time_rules.at(0, 10),
self._run_walk_forward_optimization)
# -------------------------------------------------------------------------
# Duck‑typing: attach state and indicators to the Future object
# -------------------------------------------------------------------------
def _duck_type_future(self, f):
f._tick_size = f.symbol_properties.minimum_price_variation
# ---- Hourly consolidator + indicators ----
f._h_cons = self.consolidate(f.symbol, timedelta(hours=1), self._on_hourly_bar)
f._h_std = StandardDeviation(20)
f._h_atr = AverageTrueRange(20)
self.register_indicator(f.symbol, f._h_std, f._h_cons)
self.register_indicator(f.symbol, f._h_atr, f._h_cons)
f._ema_dict = {}
for span in [4, 8, 16, 32, 64]:
ema = ExponentialMovingAverage(span)
f._ema_dict[span] = ema
self.register_indicator(f.symbol, ema, f._h_cons)
f._hourly_closes = RollingWindow[float](250) # for JIT Hurst
f._hurst_threshold = 0.50
# ---- 15‑min consolidator + indicators ----
f._m15_cons = self.consolidate(f.symbol, timedelta(minutes=15), self._on_m15_bar)
f._m15_atr = AverageTrueRange(14)
self.register_indicator(f.symbol, f._m15_atr, f._m15_cons)
f._m15_min = Minimum(1)
f._m15_max = Maximum(1)
self.register_indicator(f.symbol, f._m15_min, f._m15_cons, Field.LOW)
self.register_indicator(f.symbol, f._m15_max, f._m15_cons, Field.HIGH)
# ---- 5‑min consolidator + indicators ----
f._m5_cons = self.consolidate(f.symbol, timedelta(minutes=5), self._on_m5_bar)
f._m5_atr = AverageTrueRange(14)
f._m5_bb = BollingerBands(20, 2.0)
self.register_indicator(f.symbol, f._m5_atr, f._m5_cons)
self.register_indicator(f.symbol, f._m5_bb, f._m5_cons)
# Micro‑structure windows
f._vol_window = RollingWindow[float](50)
f._delta_window = RollingWindow[float](5)
f._m5_body_window = RollingWindow[float](20)
f._vpin_buy_window = RollingWindow[float](50)
f._vpin_sell_window = RollingWindow[float](50)
f._vpin_bucket_size = 5000
f._current_bucket_buy = 0
f._current_bucket_sell = 0
f._is_market_toxic = False
f._tick_times_ms = RollingWindow[float](100)
# Walk‑forward windows
f._wfo_vpin = RollingWindow[float](720)
f._wfo_hawkes = RollingWindow[float](720)
f._wfo_delta = RollingWindow[float](720)
f._wfo_atr = RollingWindow[float](720)
f._baseline_atr = 0.0
# Dynamic thresholds
f._vpin_toxicity_threshold = 0.75
f._hawkes_excitation_threshold = 5.0
f._dynamic_delta_threshold = 150.0
# Absorption / retest state
f._absorption_zone = None
f._displaced = False
f._absorption_bars_since = 0
f._entry_ticket = None
f._stop_ticket = None
f._baseline_bid_liquidity = 0
f._pending_side = None
f._pending_qty = 0
f._pending_stop_dist = 0.0
f._trade_direction = None
f._entry_price = 0.0
f._last_known_mapped = None
f._pending_regime = None
f._entry_regime = None
f._pending_target_price = 0.0
f._active_target_price = 0.0
f._initial_stop_dist = 0.0
# Parameters
f._forecast_entry_threshold = 5.0
f._forecast_decay_threshold = 4.0
f._base_risk_pct = 0.004
f._atr_mult = 1.75
f._retest_vol_max = 0.45
# Forecast state
f._macro_bias = 0
f._volatility_regime = "WARMUP"
f._continuous_forecast = 0.0
f._forecast_scalar_by_span = {4: 8.53, 8: 5.95, 16: 4.10}
f._abs_forecast_cap = 20
# Internals
f._current_m5_delta = 0.0
f._last_trade_price = 0.0
# Roll support
f._roll_in_progress = False
f._roll_new_mapped = None
f._roll_old_quantity = 0
f._deferred_roll = False
f._deferred_roll_new_mapped = None
f._deferred_roll_old_quantity = 0
f._deferred_roll_side = None
f._deferred_roll_stop_dist = 0.0
# -------------------------------------------------------------------------
# Daily / Monthly
# -------------------------------------------------------------------------
def _reset_daily_metrics(self):
self._start_of_day_equity = self.portfolio.total_portfolio_value
self._daily_kill_switch_tripped = False
if self.portfolio.total_portfolio_value > self._high_water_mark:
self._high_water_mark = self.portfolio.total_portfolio_value
def _run_walk_forward_optimization(self):
if self.is_warming_up: return
for f in self._futures:
self._recalibrate_wfo(f)
# -------------------------------------------------------------------------
# OnData
# -------------------------------------------------------------------------
def on_data(self, data):
if not self._daily_kill_switch_tripped:
dd = (self._start_of_day_equity - self.portfolio.total_portfolio_value) / self._start_of_day_equity
if dd >= self._max_daily_drawdown:
self.debug(f"KILL SWITCH {dd:.2%}")
self._daily_kill_switch_tripped = True
self.liquidate()
return
if self._daily_kill_switch_tripped: return
for f in self._futures:
self._process_deferred_roll(f, data)
if data.bars.contains_key(f.symbol):
self._process_trade_bar(f, data.bars[f.symbol])
if f._entry_ticket is not None:
self._check_adverse_selection(f)
mapped = f.mapped
if mapped is not None:
if f._last_known_mapped is not None and f._last_known_mapped != mapped:
self.debug(f"ROLL [{f.symbol}]: {f._last_known_mapped} -> {mapped}")
self._handle_contract_roll(f, mapped)
f._last_known_mapped = mapped
if mapped and self.portfolio[mapped].invested:
self._update_trailing_stop(f)
self._check_macro_decay(f, mapped)
def on_order_event(self, order_event):
for f in self._futures:
if order_event.symbol == f.mapped or order_event.symbol == f.symbol:
self._handle_order_event(f, order_event)
break
# =========================================================================
# Per‑future logic
# =========================================================================
def _recalibrate_wfo(self, f):
if not f._wfo_vpin.is_ready or not f._wfo_hawkes.is_ready or not f._wfo_delta.is_ready: return
vpin_arr = np.array(list(f._wfo_vpin), dtype=np.float64)
hawkes_arr = np.array(list(f._wfo_hawkes), dtype=np.float64)
delta_arr = np.array(list(f._wfo_delta), dtype=np.float64)
f._vpin_toxicity_threshold = max(0.15, np.percentile(vpin_arr, 70))
f._hawkes_excitation_threshold = max(0.50, np.percentile(hawkes_arr, 85))
f._dynamic_delta_threshold = max(10.0, np.percentile(delta_arr, 70))
if f._wfo_atr.is_ready:
f._baseline_atr = np.median(np.array(list(f._wfo_atr), dtype=np.float64))
def _process_trade_bar(self, f, bar):
f._tick_times_ms.add(bar.end_time.timestamp() * 1000)
if bar.close > f._last_trade_price:
f._current_m5_delta += bar.volume
f._current_bucket_buy += bar.volume
elif bar.close < f._last_trade_price:
f._current_m5_delta -= bar.volume
f._current_bucket_sell += bar.volume
else:
f._current_bucket_buy += bar.volume / 2
f._current_bucket_sell += bar.volume / 2
f._last_trade_price = bar.close
if (f._current_bucket_buy + f._current_bucket_sell) >= f._vpin_bucket_size:
f._vpin_buy_window.add(f._current_bucket_buy)
f._vpin_sell_window.add(f._current_bucket_sell)
f._current_bucket_buy = 0
f._current_bucket_sell = 0
if f._vpin_buy_window.is_ready:
buy_arr = np.array(list(f._vpin_buy_window)[::-1], dtype=np.float64)
sell_arr = np.array(list(f._vpin_sell_window)[::-1], dtype=np.float64)
vpin = np.abs(buy_arr - sell_arr) / f._vpin_bucket_size
f._is_market_toxic = (np.mean(vpin[-50:]) if len(vpin)>=50 else 0.0) > f._vpin_toxicity_threshold
# Consolidator callbacks
def _on_hourly_bar(self, bar):
f = self._find_future(bar.symbol)
if f is None or not f._h_atr.is_ready: return
f._hourly_closes.add(bar.close)
if f._h_atr.is_ready:
f._wfo_atr.add(f._h_atr.current.value)
if f._vpin_buy_window.is_ready:
buy_arr = np.array(list(f._vpin_buy_window)[::-1], dtype=np.float64)
sell_arr = np.array(list(f._vpin_sell_window)[::-1], dtype=np.float64)
vpin_vals = np.abs(buy_arr - sell_arr) / f._vpin_bucket_size
f._wfo_vpin.add(np.mean(vpin_vals[-50:]))
if f._tick_times_ms.is_ready:
ts = np.array(list(f._tick_times_ms)[::-1], dtype=np.float64)
f._wfo_hawkes.add(jit_hawkes(ts))
if not f._hourly_closes.is_ready: return
prices = np.array(list(f._hourly_closes)[::-1], dtype=np.float64)
hurst_val = jit_hurst(prices)
# EWMAC forecast
rate = f._h_atr.current.value
if rate == 0: return
caps = {}
for i, fast_span in enumerate([4,8,16]):
slow_span = [16,32,64][i]
ewmac = (f._ema_dict[fast_span].current.value - f._ema_dict[slow_span].current.value) / rate
scaled = ewmac * f._forecast_scalar_by_span[fast_span]
caps[fast_span] = max(min(scaled, f._abs_forecast_cap), -f._abs_forecast_cap)
raw = sum(caps.values()) / len(caps)
f._continuous_forecast = max(min(raw * 1.08, f._abs_forecast_cap), -f._abs_forecast_cap)
# Regime
if hurst_val >= f._hurst_threshold:
f._volatility_regime = "TREND"
if f._continuous_forecast > f._forecast_entry_threshold:
f._macro_bias = 1
elif f._continuous_forecast < -f._forecast_entry_threshold:
f._macro_bias = -1
else:
f._macro_bias = 0
else:
f._volatility_regime = "CHOP"
f._macro_bias = 0
def _on_m15_bar(self, bar):
pass
def _on_m5_bar(self, bar):
f = self._find_future(bar.symbol)
if f is None or self.is_warming_up: return
f._vol_window.add(bar.volume)
f._m5_body_window.add(abs(bar.close - bar.open))
f._delta_window.add(f._current_m5_delta)
f._current_m5_delta = 0
if not f._vol_window.is_ready or not f._m5_atr.is_ready or not f._m5_bb.is_ready: return
# CHOP – no trades taken
if f._volatility_regime == "CHOP":
return
# TREND absorption/retest
elif f._volatility_regime == "TREND":
if f._macro_bias == 0:
if f._absorption_zone is not None:
f._absorption_zone = None
f._displaced = False
f._absorption_bars_since = 0
return
if f._absorption_zone is not None and not f._displaced:
f._absorption_bars_since += 1
if f._absorption_bars_since > 10:
f._absorption_zone = None
f._displaced = False
f._absorption_bars_since = 0
if f._absorption_zone is None:
self._detect_absorption(f, bar)
elif not f._displaced:
self._detect_displacement(f, bar)
else:
self._detect_retest(f, bar)
# Absorption / Retest helpers
def _detect_absorption(self, f, bar):
vols = [x for x in f._vol_window][1:]
mean_v = np.mean(vols)
std_v = np.std(vols)
if mean_v == 0 or std_v == 0: return
z = (bar.volume - mean_v) / std_v
body = abs(bar.close - bar.open)
dyn_z = max(1.2, min(1.0 + (std_v / mean_v) * 2.0, 2.5))
avg_body = f._m5_atr.current.value * 0.3
if f._m5_body_window.is_ready:
avg_body = np.mean(list(f._m5_body_window))
if body < avg_body * 1.2 and z > dyn_z:
f._absorption_zone = bar
f._displaced = False
f._absorption_bars_since = 0
def _detect_displacement(self, f, bar):
if f._absorption_zone is None: return
if bar.volume < f._absorption_zone.volume * 0.5: return
if f._macro_bias == 1:
if bar.close > f._absorption_zone.high:
f._displaced = True
else:
if bar.close < f._absorption_zone.low:
f._displaced = True
def _detect_retest(self, f, bar):
limit = f._absorption_zone.volume * f._retest_vol_max
in_zone = (bar.low <= f._absorption_zone.high and bar.high >= f._absorption_zone.low)
if not in_zone or bar.volume >= limit: return
if f._macro_bias == 1:
if bar.close <= bar.open: return
else:
if bar.close >= bar.open: return
self._execute_shadow_entry(f, "BUY" if f._macro_bias == 1 else "SELL")
f._absorption_zone = None
f._displaced = False
f._absorption_bars_since = 0
# Execution
def _execute_shadow_entry(self, f, side):
if self.portfolio[f.symbol].invested or f._entry_ticket is not None: return
if f._is_market_toxic: return
if not f._delta_window.is_ready: return
recent = sum(list(f._delta_window)[:3])
if side == "BUY" and recent < (f._dynamic_delta_threshold * 0.6): return
if side == "SELL" and recent > -(f._dynamic_delta_threshold * 0.6): return
if f._tick_times_ms.is_ready:
ts = np.array(list(f._tick_times_ms)[::-1], dtype=np.float64)
if jit_hawkes(ts) > (f._hawkes_excitation_threshold * 1.5):
return
mapped = f.mapped
if mapped is None or not self.securities.contains_key(mapped): return
sec = self.securities[mapped]
if not f._m15_atr.is_ready: return
bid_p, bid_v = sec.bid_price, sec.bid_size
ask_p, ask_v = sec.ask_price, sec.ask_size
total = bid_v + ask_v
vwmp = (bid_p * (ask_v / total) + ask_p * (bid_v / total)) if total > 0 else (bid_p + ask_p)/2
if vwmp == 0: return
equity = self.portfolio.total_portfolio_value
dyn_risk = f._base_risk_pct
if f._baseline_atr > 0 and f._h_atr.is_ready and f._h_atr.current.value > 0:
scalar = f._baseline_atr / f._h_atr.current.value
dyn_risk = f._base_risk_pct * max(0.25, min(scalar, 1.5))
stop_dist = f._m15_atr.current.value * f._atr_mult
f._pending_target_price = 0.0
multiplier = sec.symbol_properties.contract_multiplier
risk_per_contract = stop_dist * multiplier
qty = int((equity * dyn_risk) / risk_per_contract) if risk_per_contract > 0 else 0
qty = max(min(qty, 10), 1)
f._pending_side = side
f._pending_qty = qty if side == "BUY" else -qty
f._pending_stop_dist = stop_dist
f._pending_regime = f._volatility_regime
raw_entry = sec.bid_price if side == "BUY" else sec.ask_price
price = round(raw_entry / f._tick_size) * f._tick_size
f._entry_ticket = self.limit_order(mapped, f._pending_qty, price)
def _check_adverse_selection(self, f):
if f._entry_ticket is None: return
sec = self.securities[f.mapped]
limit = f._entry_ticket.get(OrderField.LIMIT_PRICE)
if f._pending_side == "BUY":
if abs(sec.bid_price - limit) <= (f._tick_size * 3):
if f._baseline_bid_liquidity == 0: f._baseline_bid_liquidity = sec.bid_size
if sec.bid_size < (f._baseline_bid_liquidity * 0.30) and sec.bid_size > 0:
self.transactions.cancel_order(f._entry_ticket.order_id)
f._entry_ticket = None
f._baseline_bid_liquidity = 0
else:
if abs(sec.ask_price - limit) <= (f._tick_size * 3):
if f._baseline_bid_liquidity == 0: f._baseline_bid_liquidity = sec.ask_size
if sec.ask_size < (f._baseline_bid_liquidity * 0.30) and sec.ask_size > 0:
self.transactions.cancel_order(f._entry_ticket.order_id)
f._entry_ticket = None
f._baseline_bid_liquidity = 0
def _update_trailing_stop(self, f):
if f._entry_regime != "TREND" or f._stop_ticket is None: return
if not f._m15_min.is_ready or not f._m15_max.is_ready or not f._m15_atr.is_ready: return
atr = f._m15_atr.current.value
tick = f._tick_size
if f._trade_direction == "BUY":
new_stop = f._m15_min.current.value - (atr * f._atr_mult)
if new_stop > f._stop_ticket.get(OrderField.STOP_PRICE):
update = UpdateOrderFields()
update.stop_price = round(new_stop / tick) * tick
f._stop_ticket.update(update)
else:
new_stop = f._m15_max.current.value + (atr * f._atr_mult)
if new_stop < f._stop_ticket.get(OrderField.STOP_PRICE):
update = UpdateOrderFields()
update.stop_price = round(new_stop / tick) * tick
f._stop_ticket.update(update)
def _check_macro_decay(self, f, mapped):
if f._trade_direction is None or f._entry_price == 0.0: return
qty = self.portfolio[mapped].quantity
if qty == 0: return
sec = self.securities[mapped]
if f._entry_regime == "TREND":
if (f._trade_direction == "BUY" and f._continuous_forecast < f._forecast_decay_threshold) or \
(f._trade_direction == "SELL" and f._continuous_forecast > -f._forecast_decay_threshold):
self.market_order(mapped, -qty)
if f._stop_ticket is not None:
self.transactions.cancel_order(f._stop_ticket.order_id)
self.debug(f"[{f.symbol}] TREND DECAY: {f._continuous_forecast:.2f}")
# No CHOP handling needed
def _handle_order_event(self, f, order_event):
if order_event.status != OrderStatus.FILLED: return
mapped = f.mapped
if mapped is None: return
fill_price = order_event.fill_price
fill_qty = order_event.fill_quantity
tick = f._tick_size
if f._roll_in_progress and order_event.symbol == f._roll_new_mapped and fill_qty == f._roll_old_quantity:
stop = fill_price - f._pending_stop_dist if f._pending_side == "BUY" else fill_price + f._pending_stop_dist
f._stop_ticket = self.stop_market_order(f._roll_new_mapped, -fill_qty, round(stop / tick) * tick)
f._entry_price = fill_price
f._roll_in_progress = False
return
if f._entry_ticket and order_event.order_id == f._entry_ticket.order_id:
if f._pending_stop_dist <= 0:
self._reset_trade_state(f)
return
stop = fill_price - f._pending_stop_dist if f._pending_side == "BUY" else fill_price + f._pending_stop_dist
f._stop_ticket = self.stop_market_order(mapped, -f._pending_qty, round(stop / tick) * tick)
f._trade_direction = f._pending_side
f._entry_price = fill_price
f._initial_stop_dist = f._pending_stop_dist
f._entry_regime = f._pending_regime
f._active_target_price = f._pending_target_price
f._entry_ticket = None
return
if f._trade_direction:
is_close = (f._trade_direction == "BUY" and fill_qty < 0) or (f._trade_direction == "SELL" and fill_qty > 0)
if is_close:
mult = self.securities[mapped].symbol_properties.contract_multiplier
points = fill_price - f._entry_price if f._trade_direction == "BUY" else f._entry_price - fill_price
pnl = points * abs(fill_qty) * mult
r = points / f._initial_stop_dist if f._initial_stop_dist > 0 else 0
tag = "STOP HIT" if (f._stop_ticket and order_event.order_id == f._stop_ticket.order_id) else "CLOSE"
self.debug(f"{tag} [{f._entry_regime}] {f.symbol} Dir:{f._trade_direction} Entry:{f._entry_price:.2f} Exit:{fill_price:.2f} Qty:{abs(fill_qty)} PnL:${pnl:.2f} R:{r:.2f}R")
if self.portfolio[mapped].quantity == 0:
self._reset_trade_state(f)
def _reset_trade_state(self, f):
f._trade_direction = None
f._stop_ticket = None
f._entry_ticket = None
f._entry_price = 0.0
f._initial_stop_dist = 0.0
f._entry_regime = None
f._pending_target_price = 0.0
f._active_target_price = 0.0
f._roll_in_progress = False
f._roll_new_mapped = None
f._roll_old_quantity = 0
f._deferred_roll = False
f._deferred_roll_new_mapped = None
f._deferred_roll_old_quantity = 0
f._deferred_roll_side = None
f._deferred_roll_stop_dist = 0.0
f._absorption_bars_since = 0
# ---- Contract roll ----
def _process_deferred_roll(self, f, data):
if not f._deferred_roll: return
new = f._deferred_roll_new_mapped
if new is None or not data.bars.contains_key(new): return
old = f._last_known_mapped
if old is None or not self.securities.contains_key(old): return
q = f._deferred_roll_old_quantity
if q != 0:
self.market_order(old, -q)
self.market_order(new, q)
f._roll_in_progress = True
f._roll_new_mapped = new
f._roll_old_quantity = q
f._pending_side = f._deferred_roll_side
f._pending_stop_dist = f._deferred_roll_stop_dist
f._pending_regime = f._entry_regime
f._pending_target_price = f._active_target_price
f._deferred_roll = False
def _handle_contract_roll(self, f, new_mapped):
if not self.securities.contains_key(new_mapped): return
old_mapped = f._last_known_mapped
if old_mapped is None: return
if not self.securities[new_mapped].has_data:
if f._entry_ticket:
self.transactions.cancel_order(f._entry_ticket.order_id)
f._entry_ticket = None
if f._stop_ticket:
self.transactions.cancel_order(f._stop_ticket.order_id)
f._stop_ticket = None
old_qty = self.portfolio[old_mapped].quantity
if old_qty != 0:
f._deferred_roll = True
f._deferred_roll_new_mapped = new_mapped
f._deferred_roll_old_quantity = old_qty
f._deferred_roll_side = "BUY" if old_qty > 0 else "SELL"
f._deferred_roll_stop_dist = f._initial_stop_dist
return
if f._entry_ticket:
self.transactions.cancel_order(f._entry_ticket.order_id)
f._entry_ticket = None
if f._stop_ticket:
self.transactions.cancel_order(f._stop_ticket.order_id)
f._stop_ticket = None
old_qty = self.portfolio[old_mapped].quantity
if old_qty == 0:
self._reset_trade_state(f)
return
side = "BUY" if old_qty > 0 else "SELL"
dist = f._initial_stop_dist
self.market_order(old_mapped, -old_qty)
self.market_order(new_mapped, old_qty)
f._roll_in_progress = True
f._roll_new_mapped = new_mapped
f._roll_old_quantity = old_qty
f._pending_side = side
f._pending_stop_dist = dist
f._pending_regime = f._entry_regime
f._pending_target_price = f._active_target_price
def _find_future(self, symbol):
for f in self._futures:
if f.symbol == symbol:
return f
return Nonefrom AlgorithmImports import *
class MedallionRegimeMasterAlgo(QCAlgorithm):
def initialize(self):
self.set_start_date(2023, 1, 1)
self.set_end_date(2024, 1, 1)
self.set_cash(50000)
self.set_brokerage_model(BrokerageName.INTERACTIVE_BROKERS_BROKERAGE, AccountType.MARGIN)
self.mnq = self.add_future(Futures.Indices.MICRO_NASDAQ_100_E_MINI,
resolution=Resolution.MINUTE,
data_normalization_mode=DataNormalizationMode.RAW,
data_mapping_mode=DataMappingMode.OPEN_INTEREST,
contract_depth_offset=0)
self.mnq.set_filter(timedelta(0), timedelta(90))
self.my_adx = AverageDirectionalIndex(14)
self.my_atr = AverageTrueRange(14, MovingAverageType.SIMPLE)
self.my_keltner = KeltnerChannels(20, 1.5, MovingAverageType.EXPONENTIAL)
self.ema_200 = ExponentialMovingAverage(200)
self.set_warm_up(200, Resolution.HOUR)
self.atr_window = RollingWindow[float](50)
self.my_atr.updated += self.on_atr_updated
self.stop_loss_ticket = None
self.take_profit_ticket = None
self.daily_start_equity = self.portfolio.total_portfolio_value
self.daily_loss_limit_pct = 2.0
self.trading_halted_today = False
self.daily_trades_taken = 0
self.max_trades_per_day = 3
self.schedule.on(self.date_rules.every_day(),
self.time_rules.midnight,
self.reset_daily_limits)
self.settings.seed_initial_prices = True
self.debug(f"Algorithm initialized at {self.time}")
def on_atr_updated(self, sender, updated):
if self.my_atr.is_ready:
self.atr_window.add(updated.value)
def reset_daily_limits(self):
self.daily_start_equity = self.portfolio.total_portfolio_value
self.trading_halted_today = False
self.daily_trades_taken = 0
def on_data(self, data):
if self.is_warming_up:
return
mapped = self.mnq.mapped
if mapped is None or not data.contains_key(mapped):
return
bar = data[mapped]
if self.time.minute == 0:
self.my_adx.update(bar)
self.my_atr.update(bar)
self.my_keltner.update(bar)
self.ema_200.update(self.time, bar.close)
if self.time.minute != 0:
return
if not self._indicators_ready():
return
if self.trading_halted_today:
return
if self._check_daily_loss_limit():
return
if not self._is_trading_session():
return
if self.portfolio.invested or self.daily_trades_taken >= self.max_trades_per_day:
return
current_price = bar.close
regime = self.determine_regime(current_price)
if self.time.hour % 2 == 0:
self.debug(f"[{self.time}] Regime: {regime} | ADX: {self.my_adx.current.value:.1f} | Price: {current_price:.2f} | EMA200: {self.ema_200.current.value:.2f}")
if regime == "High_Vol_Trend":
self.run_high_vol_trend_logic(current_price, mapped)
elif regime == "Low_Vol_Trend":
self.run_low_vol_trend_logic(current_price, mapped)
def determine_regime(self, current_price):
if current_price <= 0:
return "Range_Regime"
normalized_atr = (self.my_atr.current.value / current_price) * 100
atr_list = [x for x in self.atr_window]
if len(atr_list) < 2:
return "Range_Regime"
vol_threshold = sorted(atr_list)[len(atr_list) // 2]
is_trending = self.my_adx.current.value > 20
is_high_vol = normalized_atr > vol_threshold
if is_trending and is_high_vol:
return "High_Vol_Trend"
if is_trending and not is_high_vol:
return "Low_Vol_Trend"
return "Range_Regime"
def run_high_vol_trend_logic(self, price, symbol):
upper = self.my_keltner.upper_band.current.value
ema = self.ema_200.current.value
if price >= upper and price > ema:
stop_price = price - (self.my_atr.current.value * 1.15)
target_price = price + (self.my_atr.current.value * 4.5)
size = self.calculate_volatility_adjusted_size(symbol, price, stop_price)
if size > 0:
self.execute_trade_with_bracket(symbol, size, stop_price, target_price, "PROD: High-Vol Long")
def run_low_vol_trend_logic(self, price, symbol):
keltner_range = self.my_keltner.upper_band.current.value - self.my_keltner.lower_band.current.value
dip_threshold = self.my_keltner.middle_band.current.value + (keltner_range * 0.2)
ema = self.ema_200.current.value
if price <= dip_threshold and price > ema:
stop_price = price - (self.my_atr.current.value * 1.0)
target_price = price + (self.my_atr.current.value * 3.5)
size = self.calculate_volatility_adjusted_size(symbol, price, stop_price)
if size > 0:
self.execute_trade_with_bracket(symbol, size, stop_price, target_price, "PROD: Low-Vol Long Dip")
def execute_trade_with_bracket(self, symbol, size, stop_price, target_price, tag):
self.market_order(symbol, size, tag=tag)
self.stop_loss_ticket = self.stop_market_order(symbol, -size, stop_price, tag=f"{tag}_SL")
self.take_profit_ticket = self.limit_order(symbol, -size, target_price, tag=f"{tag}_TP")
self.daily_trades_taken += 1
self.debug(f"[{self.time}] {tag} | Size: {size} | Stop: {stop_price:.2f} | Target: {target_price:.2f}")
def on_order_event(self, order_event):
if order_event.status == OrderStatus.FILLED:
if self.take_profit_ticket is not None and order_event.order_id == self.take_profit_ticket.order_id:
if self.stop_loss_ticket is not None:
self.stop_loss_ticket.cancel("Target reached.")
self.take_profit_ticket = None
self.stop_loss_ticket = None
elif self.stop_loss_ticket is not None and order_event.order_id == self.stop_loss_ticket.order_id:
if self.take_profit_ticket is not None:
self.take_profit_ticket.cancel("Stopped out.")
self.take_profit_ticket = None
self.stop_loss_ticket = None
def calculate_volatility_adjusted_size(self, symbol, entry_price, stop_loss_price, risk_pct=1.0):
account_equity = self.portfolio.total_portfolio_value
capital_to_risk = account_equity * (risk_pct / 100.0)
trade_risk_in_points = abs(entry_price - stop_loss_price)
if trade_risk_in_points == 0:
return 0
notional_risk_per_contract = trade_risk_in_points * 2
raw_quantity = int(capital_to_risk / notional_risk_per_contract)
free_margin = self.portfolio.margin_remaining
margin_model = self.securities[symbol].buying_power_model
margin_params = InitialMarginParameters(self.securities[symbol], 1)
margin_per_contract = margin_model.get_initial_margin_requirement(margin_params).value
if margin_per_contract == 0:
margin_per_contract = 1500.0
max_contracts_allowed = int(free_margin / margin_per_contract)
final_quantity = min(raw_quantity, max_contracts_allowed)
return max(0, final_quantity)
def _indicators_ready(self):
return (self.atr_window.is_ready and
self.ema_200.is_ready and
self.my_adx.is_ready and
self.my_keltner.is_ready)
def _check_daily_loss_limit(self):
current_equity = self.portfolio.total_portfolio_value
if self.daily_start_equity == 0:
return False
daily_return_pct = ((current_equity - self.daily_start_equity) / self.daily_start_equity) * 100
if daily_return_pct <= -self.daily_loss_limit_pct:
self.liquidate()
self.trading_halted_today = True
self.debug(f"[{self.time}] Daily loss limit hit: {daily_return_pct:.2f}%")
return True
return False
def _is_trading_session(self):
if self.time.hour < 9 or self.time.hour >= 16:
return False
if self.time.hour == 9 and self.time.minute < 30:
return False
return True