| Overall Statistics |
|
Total Orders 270 Average Win 12.01% Average Loss -12.15% Compounding Annual Return -99.991% Drawdown 99.700% Expectancy -0.263 Start Equity 50000 End Equity 207.5 Net Profit -99.585% Sharpe Ratio -0.657 Sortino Ratio -1 Probabilistic Sharpe Ratio 0.005% Loss Rate 63% Win Rate 37% Profit-Loss Ratio 0.99 Alpha 0 Beta 0 Annual Standard Deviation 1.597 Annual Variance 2.551 Information Ratio -0.622 Tracking Error 1.597 Treynor Ratio 0 Total Fees $1262.50 Estimated Strategy Capacity $560000.00 Lowest Capacity Asset SPXW YUPH5DOXEOA6|SPX 31 Portfolio Turnover 103.25% Drawdown Recovery 1 |
# region imports
from AlgorithmImports import *
from datetime import time, timedelta, datetime
from collections import deque
import math
# endregion
# --- Simple slippage model to keep paper fills realistic ---
class PercentSlippageModel:
def __init__(self, pct: float): self.pct = float(pct)
def GetSlippageApproximation(self, asset, order):
price = float(asset.Price) if asset.Price else 0.0
if price <= 0:
ld = asset.GetLastData()
if ld is not None:
price = float(getattr(ld, "Price", 0.0)) or 0.0
return price * self.pct
class SPX_NGreenCalls(QCAlgorithm):
def Initialize(self):
# ----- Dates / cash / tz -----
self.SetStartDate(2025, 1, 25)
self.SetEndDate(2025, 8, 27)
self.SetCash(50000)
self.SetTimeZone("America/Los_Angeles")
# Brokerage model
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Cash)
# Warmup (disable in live to avoid IB history calls on SPX)
if self.LiveMode: self.SetWarmup(0)
else: self.SetWarmup(timedelta(minutes=60))
# Slippage
self.SetSecurityInitializer(self._InitSecurity)
# ========= DATA: SPX minute bars for signals =========
self.index = None
self.spx_symbol = None
self._resolve_spx_subscription()
# Trade SPXW (index options on SPX)
self.spxw = self.AddIndexOption("SPX", "SPXW", Resolution.Minute)
self.spxw.SetFilter(lambda u: u.IncludeWeeklys().Expiration(0, 0).Strikes(-80, 80))
# Rolling minute window for SPX bars
self.window = deque(maxlen=40)
# ===== Indicators =====
self.ema_len = 8
self.ema8 = self.EMA(self.index.Symbol, self.ema_len, Resolution.Minute) if self.index else None
self.ema8_vals = deque(maxlen=4) # for slope check
self.atr = None # created lazily
# ----- PARAMS -----
# N green bars (default 4)
gb = self.GetParameter("green_bars") or self.GetParameter("green") or "4"
try: self.green_bars_required = max(1, int(gb))
except: self.green_bars_required = 4
# EMA slope filter toggle (default OFF)
p_ema = (self.GetParameter("use_ema_slope") or "0").strip().lower()
self.use_ema_slope = p_ema in ("1","true","yes","on")
# ATR filter toggle (default OFF) + threshold
p_atr = (self.GetParameter("use_atr_filter") or "0").strip().lower()
self.use_atr_filter = p_atr in ("1","true","yes","on")
atr_param = (self.GetParameter("atr_min") or "0.8").strip()
try: self.atr_min = float(atr_param)
except: self.atr_min = 0.8
# NEW: overlay toggles
p_brk = (self.GetParameter("use_breakout") or "1").strip().lower()
self.use_breakout = p_brk in ("1","true","yes","on")
p_body = (self.GetParameter("use_strong_body") or "1").strip().lower()
self.use_strong_body = p_body in ("1","true","yes","on")
mbp_param = (self.GetParameter("min_body_points") or "0.5").strip()
try: self.min_body_points = float(mbp_param)
except: self.min_body_points = 0.5
# NEW: delta filter toggle (default ON like before)
p_delta = (self.GetParameter("use_delta_filter") or "1").strip().lower()
self.use_delta_filter = p_delta in ("1","true","yes","on")
self.min_delta = 0.25
self.max_delta = 0.40
# ----- Session window (PT) -----
self.entry_start = time(6, 35)
self.entry_end = time(10, 45)
self.eod_exit = time(12, 45)
# ----- Sizing & option quality -----
self.trade_capital = 20000.0
self.max_spread_abs = 0.45
self.max_spread_pct = 0.30
self.preferred_ask_low = 10.0
self.preferred_ask_high = 20.0
# ----- Exits / Risk -----
self.full_tp_pct = 0.10 # sell ALL at +10%
self.per_trade_pct_stop = 0.20 # exit at -20% from fill
self.option_hard_stop_loss = 0.25
self.base_max_trade_duration = timedelta(minutes=30)
self.extended_trade_duration = timedelta(minutes=45) # if unrealized >= +10%
self.extend_on_gain_threshold= 0.10
# Time-to-profit rail
self.ttp_window = timedelta(minutes=10)
self.ttp_target_pct = 0.022
self.ttp_stop_pct = -0.12
# ----- Daily rules -----
self.max_trades_per_day = 2
self.cooldown_normal = timedelta(minutes=5)
self.cooldown_after_invalidation = timedelta(minutes=15)
self.last_exit = None
self.last_exit_reason = None
self.current_day = None
self.trades_today = 0
self.daily_target_usd = 500.0
self.daily_loss_cap_usd = -500.0
self.start_of_day_equity = None
self.allow_new_entries_today = True
self.bad_exits_today = 0
self.bad_exit_reasons = {"TimeToProfitStop", "TimeToProfitFail", "Invalidation_EMA8"}
# Position state
self.position_contract = None
self.entry_time = None
self.option_entry_avg = None
self.entry_bar_grace = 2
self.bars_since_entry = 0
self.invalidation_hits = 0
# Avoid strike reuse
self.strike_last_used = {}
self.strike_reuse_cooldown = timedelta(minutes=30)
# Diagnostics
self.diag_n3_true = 0
self.diag_n4_true = 0
self.diag_n3_only_preOverlay = 0
# Labels & stats
self.variant_label = f"{self.green_bars_required}Green"
overlays_label = ("OFF" if (not self.use_breakout and not self.use_strong_body)
else "BOTH" if (self.use_breakout and self.use_strong_body)
else "BRK" if self.use_breakout else "BODY")
self.SetRuntimeStatistic("Variant", self.variant_label)
self.SetRuntimeStatistic("EMAFilter", "ON" if self.use_ema_slope else "OFF")
self.SetRuntimeStatistic("ATRFilter", "ON" if self.use_atr_filter else "OFF")
self.SetRuntimeStatistic("Overlays", overlays_label)
self.SetRuntimeStatistic("ATRmin", f"{self.atr_min:.2f}")
self.SetRuntimeStatistic("DeltaFilter", "ON" if self.use_delta_filter else "OFF")
self.SetRuntimeStatistic("TradesToday", "0")
# Logging
self.log_every_bar = False
self._last_log_min = None
self._last_stat_min = None
# --- Security init / slippage ---
def _InitSecurity(self, sec: Security):
if sec.Type == SecurityType.Option:
sec.SetSlippageModel(PercentSlippageModel(0.01))
elif sec.Type == SecurityType.Index:
sec.SetSlippageModel(PercentSlippageModel(0.0005))
# ---------- SPX subscription resolver (live-safe) ----------
def _resolve_spx_subscription(self):
if self.LiveMode:
try:
idx = self.AddIndex("SPX", Resolution.Minute)
self.index = idx
self.spx_symbol = idx.Symbol
self.Debug("[SPX] Using 'SPX' (live, no history probe)")
return
except Exception as e:
self.Debug(f"[SPX] Live add failed: {e}")
return
for t in ["^SPX", "SPX"]:
try:
idx = self.AddIndex(t, Resolution.Minute)
hist = self.History(idx.Symbol, 5, Resolution.Minute)
if hist is not None and not hist.empty:
self.index = idx
self.spx_symbol = idx.Symbol
self.Debug(f"[SPX] Using '{t}' for minute bars (bt)")
return
else:
self.RemoveSecurity(idx.Symbol)
except Exception as e:
self.Debug(f"[SPX] Failed '{t}': {e}")
self.Debug("[SPX] Could not resolve in Initialize; will retry from option chain underlying in OnData")
# ---------- Utilities ----------
def _day_pnl(self):
if self.start_of_day_equity is None: return 0.0
return self.Portfolio.TotalPortfolioValue - self.start_of_day_equity
@staticmethod
def _safe_float(x, default=0.0):
if x is None: return default
try: return float(x)
except Exception:
try:
v = getattr(x, "Value", None)
if v is not None: return float(v)
except Exception: pass
if isinstance(x, (list, tuple)) and len(x) > 0:
try: return SPX_NGreenCalls._safe_float(x[0], default)
except Exception: pass
return default
def _dbg_gate(self, code, extra=""):
important = code in ("ENTRY","FILL","EXIT","TARGET_LOCK","HALT","SPX_WAIT")
if not self.log_every_bar and not important: return
m = self.Time.strftime("%H:%M")
if self._last_log_min == m and self.log_every_bar: return
self._last_log_min = m
self.Debug(f"[{self.Time:%H:%M}] {code} {extra}")
# ---------- Option quality / selection ----------
def _OptionOK(self, c):
if c is None: return False
bid = (c.BidPrice or 0); ask = (c.AskPrice or 0)
if bid <= 0 or ask <= 0: return False
mid = (bid + ask)/2.0
spr = ask - bid
if mid <= 0.50: return False
if spr > self.max_spread_abs: return False
if mid > 0 and spr/mid > self.max_spread_pct: return False
return True
def _DeltaOk(self, c):
if not self.use_delta_filter: return True
try:
g = c.Greeks
if not g or g.Delta is None or not math.isfinite(g.Delta): return True
return self.min_delta <= g.Delta <= self.max_delta
except Exception:
return True
def _PickContractForBudget(self, chain, u_price):
cands = [c for c in chain
if c.Expiry.date() == self.Time.date()
and c.Right == OptionRight.Call
and self._OptionOK(c)
and self._DeltaOk(c)]
if not cands: return None
def score(c):
ask = (c.AskPrice or 0)
band_pen = 0.0
if ask <= 0: band_pen = 999.0
elif ask < self.preferred_ask_low: band_pen = (self.preferred_ask_low - ask) * 5
elif ask > self.preferred_ask_high: band_pen = (ask - self.preferred_ask_high) * 5
return (band_pen, abs(c.Strike - u_price))
return min(cands, key=score)
# ---------- N-green-bars build-up ----------
def _n_green_buildup(self, n:int):
if len(self.window) < n: return False
bars = list(self.window)[-n:]
if not all(b.Close > b.Open for b in bars): return False
for i in range(1, n):
if not (bars[i].Close > bars[i-1].Close): return False
return True
# ---------- Overlays ----------
def _twenty_min_high_break(self):
w = list(self.window)
if len(w) < 21: return False
prev20_high = max(b.High for b in w[-21:-1])
return w[-1].Close >= prev20_high
def _strong_body(self):
if len(self.window) == 0: return False
b = self.window[-1]
body = abs(b.Close - b.Open)
return body >= self.min_body_points
def _ema8_slope_up(self):
if len(self.ema8_vals) < 4: return False
return self.ema8_vals[-1] > self.ema8_vals[0]
# ---------- Main loop ----------
def OnData(self, slice: Slice):
if self.IsWarmingUp:
self._dbg_gate("WARMUP"); return
# Daily reset
d = self.Time.date()
if getattr(self, "current_day", None) != d:
self.current_day = d
self.trades_today = 0
self.window.clear()
self.strike_last_used.clear()
self.invalidation_hits = 0
self.bars_since_entry = 0
self.start_of_day_equity = self.Portfolio.TotalPortfolioValue
self.allow_new_entries_today = True
self.bad_exits_today = 0
self.SetRuntimeStatistic("TradesToday", "0")
self.SetRuntimeStatistic("DayPnL", "0")
overlays_label = ("OFF" if (not self.use_breakout and not self.use_strong_body)
else "BOTH" if (self.use_breakout and self.use_strong_body)
else "BRK" if self.use_breakout else "BODY")
self.SetRuntimeStatistic("Variant", self.variant_label)
self.SetRuntimeStatistic("EMAFilter", "ON" if self.use_ema_slope else "OFF")
self.SetRuntimeStatistic("ATRFilter", "ON" if self.use_atr_filter else "OFF")
self.SetRuntimeStatistic("Overlays", overlays_label)
self.SetRuntimeStatistic("ATRmin", f"{self.atr_min:.2f}")
self.SetRuntimeStatistic("DeltaFilter", "ON" if self.use_delta_filter else "OFF")
self.Debug(f"[RESET] {self.current_day} trades_today reset to 0, Variant={self.variant_label}, "
f"EMA={'ON' if self.use_ema_slope else 'OFF'}, ATR={'ON' if self.use_atr_filter else 'OFF'}, "
f"Overlays={overlays_label}, ATRmin={self.atr_min:.2f}, DeltaFilter={'ON' if self.use_delta_filter else 'OFF'}")
# Heartbeat/stat each minute
m = self.Time.strftime("%H:%M")
if self._last_stat_min != m:
self._last_stat_min = m
self.SetRuntimeStatistic("DayPnL", f"{self._day_pnl():.0f}")
self.SetRuntimeStatistic("HB", self.Time.strftime("%H:%M:%S"))
# Resolve SPX from chain (live-safe)
if self.index is None:
try:
if slice.OptionChains.ContainsKey(self.spxw.Symbol):
chain = slice.OptionChains[self.spxw.Symbol]
if getattr(chain, "Underlying", None):
und_sym = chain.Underlying.Symbol
idx = self.AddIndex(und_sym.Value, Resolution.Minute)
self.index = idx
self.spx_symbol = idx.Symbol
if self.ema8 is None:
self.ema8 = self.EMA(self.index.Symbol, self.ema_len, Resolution.Minute)
self.Debug(f"[SPX] Resolved from chain underlying: '{und_sym.Value}'")
except Exception as e:
self.Debug(f"[SPX] Chain-based resolve failed: {e}")
# Need SPX bar
if self.index is None: self._dbg_gate("SPX_WAIT", "no index"); return
if not slice.ContainsKey(self.index.Symbol): self._dbg_gate("NO_SPX_BAR"); return
bar = slice[self.index.Symbol]
self.window.append(bar)
if self.ema8 and self.ema8.IsReady: self.ema8_vals.append(self.ema8.Current.Value)
if self.atr is None: self.atr = self.ATR(self.index.Symbol, 10, MovingAverageType.Simple, Resolution.Minute)
self._dbg_gate("SPX_OK", f"o={bar.Open:.2f} h={bar.High:.2f} l={bar.Low:.2f} c={bar.Close:.2f}")
# Manage open first
if self.Portfolio.Invested:
self.ManagePosition(slice); return
# ======= DAILY GATES =======
daypnl = self._day_pnl()
if not self.allow_new_entries_today: return
if daypnl >= self.daily_target_usd:
self.allow_new_entries_today = False; self._dbg_gate("TARGET_LOCK", f"PnL {daypnl:.0f}"); return
if daypnl <= self.daily_loss_cap_usd:
self.allow_new_entries_today = False; self._dbg_gate("HALT", f"Loss cap {daypnl:.0f}"); return
if self.bad_exits_today >= 2:
self.allow_new_entries_today = False; self._dbg_gate("HALT", "Two strikes"); return
# ======= TIME GATES =======
t = self.Time.time()
if t < self.entry_start or t > self.entry_end: return
if self.last_exit:
cool = self.cooldown_after_invalidation if self.last_exit_reason == "Invalidation_EMA8" else self.cooldown_normal
if self.Time - self.last_exit < cool: return
if self.trades_today >= self.max_trades_per_day: return
# ---- Pre-overlay diagnostics: check 3 vs 4 green (independent of chosen N)
n3 = self._n_green_buildup(3)
n4 = self._n_green_buildup(4)
if n3: self.diag_n3_true += 1
if n4: self.diag_n4_true += 1
if n3 and not n4: self.diag_n3_only_preOverlay += 1
# ---- Entry: N green bars with rising closes ----
if not self._n_green_buildup(self.green_bars_required): return
# ---- Filters: ATR (optional) + EMA (optional) ----
if self.use_atr_filter:
if not (self.atr and self.atr.IsReady and self.atr.Current.Value >= self.atr_min): return
if self.use_ema_slope and not self._ema8_slope_up(): return
# ---- Overlays (now truly optional) ----
if self.use_breakout or self.use_strong_body:
overlay_ok = (self.use_breakout and self._twenty_min_high_break()) or \
(self.use_strong_body and self._strong_body())
else:
# BOTH overlays disabled -> do not gate entry
overlay_ok = True
if not overlay_ok: return
# ===== Option chain =====
chain = None
try:
if slice.OptionChains.ContainsKey(self.spxw.Symbol):
chain = slice.OptionChains[self.spxw.Symbol]
except Exception: pass
if not chain or len(chain) == 0: return
# ===== Underlying price =====
u_price = float(bar.Close)
if u_price <= 0:
u_price = self._safe_float(getattr(chain, "UnderlyingLastPrice", None))
if u_price <= 0 and getattr(chain, "Underlying", None) is not None:
for attr in ("Price", "Close", "LastPrice", "Value"):
u_price = self._safe_float(getattr(chain.Underlying, attr, None))
if u_price > 0: break
if u_price <= 0: return
# ===== Pick & place order (CALLS ONLY) =====
contract = self._PickContractForBudget(chain, u_price)
if not contract: return
last_used = self.strike_last_used.get((contract.Right, contract.Strike))
if last_used and (self.Time - last_used) < self.strike_reuse_cooldown: return
if contract.Symbol not in self.Securities or not self.Securities[contract.Symbol].HasData:
self.AddIndexOptionContract(contract.Symbol, Resolution.Minute); return
ask = self._safe_float(getattr(self.Securities[contract.Symbol], "AskPrice", None))
if ask <= 0: ask = self._safe_float(getattr(contract, "AskPrice", None))
if ask <= 0: return
cost_per = ask * 100.0
spend_cap = min(self.trade_capital, self.Portfolio.Cash * 0.95)
qty = int(spend_cap // cost_per)
if qty < 1 and cost_per <= self.Portfolio.Cash * 0.95: qty = 1
if qty < 1: return
self.bars_since_entry = 0
self.invalidation_hits = 0
tag = f"{self.green_bars_required}GreenRise"
self.MarketOrder(contract.Symbol, qty, tag=tag)
self.strike_last_used[(contract.Right, contract.Strike)] = self.Time
self.trades_today += 1
self.SetRuntimeStatistic("TradesToday", str(self.trades_today))
self._dbg_gate("ENTRY", f"{tag} qty={qty} ask={ask:.2f} SPX≈{u_price:.2f}")
# ---------- Fills ----------
def OnOrderEvent(self, orderEvent: OrderEvent):
if orderEvent.Status != OrderStatus.Filled: return
order = self.Transactions.GetOrderById(orderEvent.OrderId)
if not order: return
sym = orderEvent.Symbol
if "GreenRise" in (order.Tag or ""):
self.position_contract = sym
self.entry_time = self.Time
self.option_entry_avg = self.Portfolio[sym].AveragePrice
self.bars_since_entry = 0
self._dbg_gate("FILL", f"{sym.Value} avg={self.option_entry_avg:.2f} ({self.variant_label})")
# ---------- Position management ----------
def ManagePosition(self, slice: Slice):
if not self.position_contract: return
sym = self.position_contract
sec = self.Securities[sym]
bid = self._safe_float(getattr(sec, "BidPrice", None))
ask = self._safe_float(getattr(sec, "AskPrice", None))
last = self._safe_float(getattr(sec, "Price", None))
mark = (bid + ask)/2.0 if (bid > 0 and ask > 0) else last
if not (mark > 0): return
avg = self.option_entry_avg
qty = abs(self.Portfolio[sym].Quantity)
if not avg or qty <= 0: return
opt_pnl_pct = (mark - avg) / avg
pnl_dollars = (mark - avg) * qty
if opt_pnl_pct >= self.full_tp_pct:
self._finish_exit(sym, "FullTP10", pnl_dollars); return
pct_stop_price = avg * (1.0 - self.per_trade_pct_stop)
if mark <= pct_stop_price:
self._finish_exit(sym, "PerTradePctStop", pnl_dollars); return
if not hasattr(self, "ttp_target_hit") or not self.ttp_target_hit:
if opt_pnl_pct >= self.ttp_target_pct:
self.ttp_target_hit = True
else:
if opt_pnl_pct <= self.ttp_stop_pct:
self._finish_exit(sym, "TimeToProfitStop", pnl_dollars); return
if self.entry_time and (self.Time - self.entry_time) >= self.ttp_window:
self._finish_exit(sym, "TimeToProfitFail", pnl_dollars); return
self.bars_since_entry = (self.bars_since_entry or 0) + 1
if self.bars_since_entry > self.entry_bar_grace and self.ema8 and self.ema8.IsReady and len(self.window) > 0:
ema8 = self.ema8.Current.Value
close_spx = float(self.window[-1].Close)
breach_once = (close_spx < ema8 - 0.5)
breach_soft = (close_spx < ema8)
self.invalidation_hits = (self.invalidation_hits + 1) if breach_soft else 0
if breach_once or self.invalidation_hits >= 2:
self._finish_exit(sym, "Invalidation_EMA8", pnl_dollars); return
max_dur = self.extended_trade_duration if opt_pnl_pct >= self.extend_on_gain_threshold else self.base_max_trade_duration
if self.entry_time and (self.Time - self.entry_time) >= max_dur:
self._finish_exit(sym, "TimeCap", pnl_dollars); return
if opt_pnl_pct <= -self.option_hard_stop_loss:
self._finish_exit(sym, "OptionHardStopLoss", pnl_dollars); return
if self.Time.time() >= self.eod_exit:
self._finish_exit(sym, "EOD", pnl_dollars); return
def _finish_exit(self, sym, reason, pnl_dollars):
self.Liquidate(sym, reason)
self.last_exit = self.Time
self.last_exit_reason = reason
if reason in self.bad_exit_reasons:
self.bad_exits_today += 1
self.trade_capital += pnl_dollars
daypnl = self._day_pnl()
if daypnl >= self.daily_target_usd:
self.allow_new_entries_today = False
self._dbg_gate("TARGET_LOCK", f"+{daypnl:.0f} reached after exit")
elif daypnl <= self.daily_loss_cap_usd:
self.allow_new_entries_today = False
self._dbg_gate("HALT", f"Loss cap hit {daypnl:.0f}")
self.position_contract = None
self.entry_time = None
self.option_entry_avg = None
self.invalidation_hits = 0
self.bars_since_entry = 0
self.ttp_target_hit = False
self._dbg_gate("EXIT", f"{reason} ({self.variant_label})")
def OnEndOfAlgorithm(self):
self.Debug(f"Algorithm finished. Final PV: {self.Portfolio.TotalPortfolioValue} "
f"({self.variant_label}, EMA={'ON' if self.use_ema_slope else 'OFF'}, "
f"ATR={'ON' if self.use_atr_filter else 'OFF'}, "
f"Overlays={'BOTH' if (self.use_breakout and self.use_strong_body) else 'BRK' if self.use_breakout else 'BODY' if self.use_strong_body else 'OFF'}, "
f"ATRmin={self.atr_min:.2f}, DeltaFilter={'ON' if self.use_delta_filter else 'OFF'})")
self.Debug(f"[DIAG] n3_true={self.diag_n3_true}, n4_true={self.diag_n4_true}, n3_only_preOverlay={self.diag_n3_only_preOverlay}")