Overall Statistics
Total Orders
124
Average Win
0.68%
Average Loss
-0.91%
Compounding Annual Return
-51.524%
Drawdown
34.700%
Expectancy
-0.160
Start Equity
10000000
End Equity
8348966.84
Net Profit
-16.510%
Sharpe Ratio
-0.846
Sortino Ratio
-1.169
Probabilistic Sharpe Ratio
12.010%
Loss Rate
52%
Win Rate
48%
Profit-Loss Ratio
0.74
Alpha
-0.462
Beta
-0.516
Annual Standard Deviation
0.418
Annual Variance
0.174
Information Ratio
-0.174
Tracking Error
0.82
Treynor Ratio
0.685
Total Fees
$2229.32
Estimated Strategy Capacity
$250000.00
Lowest Capacity Asset
SPY 31CV49FM3ZRZA|SPY R735QTJ8XC9X
Portfolio Turnover
5.55%
# region imports
from AlgorithmImports import *
from scipy.interpolate import SmoothBivariateSpline
import numpy as np
from collections import deque
from sklearn.decomposition import PCA
# endregion


"""
VolatilitySurfaceMomentum 
---------------------------------------------
Only emits log messages*when something material happens:
   • TRADE - a straddle/strangle was opened (prints the four features & κ)
   • HEDGE - delta hedge order fired (prints netâ€‘Δ and qty)
   • EXIT - position closed by TP / SL / time or option exercise

Research-derived signal:
   |feature|  quantile  |
   |---------|-----------|
   |level_change | 0.6448|
   |rolling_level| 0.6732|
   |smile_change | 0.5417|
   |skew_change  | 0.6163|

Direction:
   sign(Δlevel) ≥ 0   → long vol  (buy straddle)
   sign(Δlevel) < 0   → short vol (sell strangle) 
"""

class VolatilitySurfaceMomentum(QCAlgorithm):
   # ---------- 1. INITIALISATION -----------------------------
   def Initialize(self):
       self.SetStartDate(2020, 2, 1)
       self.SetEndDate(2020, 5, 1)
       self.SetCash(10_000_000)


       self.tickers  = ["AAPL", "TSLA", "NVDA", "AMZN", "SPY"]
       self.under    = {}
       self.optSym   = {}


       # --- IV‑surface storage
       self.spline    = {}
       self.splineHist= {t: deque(maxlen=5) for t in self.tickers}


       # --- rolling feature histories (for daily quantiles)
       self.hist = {f: {t: deque(maxlen=250) for t in self.tickers}
                    for f in ("level_change","rolling_level","skew_change","smile_change")}

       self.q = {
           "level_change" : 0.6448,
           "rolling_level": 0.6732,
           "smile_change" : 0.5417,
           "skew_change"  : 0.6163
       }

       # risk management
       self.maxTradesPerDay = 3
       self.tradesToday     = 0
       self.maxHold         = 20   # days
       self.tp, self.sl     = 0.40, -0.14
       self.deltaThresh     = 0.25

       # in‑memory position book {Symbol: {...}}
       self.positions = {}

       for t in self.tickers:
           eq = self.AddEquity(t, Resolution.Daily).Symbol
           op = self.AddOption(t, Resolution.Daily)
           op.SetFilter(lambda u: u.IncludeWeeklys().Strikes(-20,20).Expiration(10,45))
           op.PriceModel = OptionPriceModels.CrankNicolsonFD()
           self.under[t] = eq
           self.optSym[t]= op.Symbol

       # daily surface fit 10 min after open (maybe we should change this to 1hr before market *closes& to capture news and sentiment)
       self.SetWarmUp(timedelta(days=3))                                                        # for that day.
       self.Schedule.On(self.DateRules.EveryDay(),
                        self.TimeRules.AfterMarketOpen("SPY",10),
                        self.FitSurfaces)

   # ---------- 2. FIT SURFACES ----------------------------------------------
   def FitSurfaces(self):
       for t,sym in self.optSym.items():
           chain = self.OptionChain(sym)
           if not chain: continue
           c = [x for x in chain if x.ImpliedVolatility>0 and x.UnderlyingLastPrice>0]
           if len(c) < 15: continue
           try:
               K   = [x.Strike for x in c]
               S   = [x.UnderlyingLastPrice for x in c]
               M   = [k/s-1 for k,s in zip(K,S)]
               DTE = [(x.Expiry.date()-self.Time.date()).days for x in c]
               IV  = [x.ImpliedVolatility for x in c]
               spl = SmoothBivariateSpline(M,DTE,IV,kx=3,ky=3,s=25)
               self.spline[t]=spl
               self.splineHist[t].append(spl)
           except Exception as e:
               self.Error(f"{t} spline err {e}")

   # ---------- 3. ON DATA ----------------------------------------------------
   def OnData(self,slice:Slice):
       self.tradesToday = 0
       engaged = {p["underlying"] for p in self.positions.values()}

       for t in self.tickers:
           if t in engaged:                             continue
           if t not in self.spline or len(self.splineHist[t])<4: continue

           splines = list(self.splineHist[t])[-4:]
           m = np.linspace(-0.25,0.25,11)
           iv = [np.array([s.ev(x,30) for x in m]) for s in splines]

           try:
               pca=PCA(3)
               pca.fit(iv[:3])
               pc1=pca.transform([iv[1]])[0]
               pc2=pca.transform([iv[2]])[0]
               pc3=pca.transform([iv[3]])[0]
           except Exception as e:
               self.Error(f"{t} PCA {e}"); continue

           d   = pc2-pc1
           lvl,skw,sml = d
           self._updateHist(t,lvl,skw,sml)
           rollLvl = self._rolling(t)
           if rollLvl is None: continue
           if not self._passQuant(t,lvl,skw,sml,rollLvl): continue

           # optional curvature test
           κ = np.mean(np.gradient(np.gradient(iv[-1],m),m)-np.gradient(np.gradient(iv[-2],m),m))
           if abs(κ)<0.01: continue

           # ------- TRADE ----------------------------------------------------
           chain = slice.OptionChains.get(self.optSym[t]);  
           if not chain: continue
           liquid=[c for c in chain if c.AskPrice>0 and c.OpenInterest>10]
           if not liquid: continue
           spot  = liquid[0].UnderlyingLastPrice

           # decide structure & direction ------------------------------------
           longVol = lvl>=0
           structure = "straddle" if abs(sml)<abs(skw) else "strangle"
           dir = 1 if longVol else -1

           self.Debug(f"TRADE {t} {structure} dir={dir:+} lvlΔ={lvl:.4f} skwΔ={skw:.4f} smlΔ={sml:.4f} κ={κ:.4f}")

           if structure=="straddle":
               self._enterStraddle(liquid,spot,t,dir)
           else:
               self._enterStrangle(liquid,spot,t,dir)
           if self.tradesToday>=self.maxTradesPerDay: break

       self._manage(slice)

   # ---------- 4. FEATURE UTILS ---------------------------------------------
   def _updateHist(self,t,lvl,skw,sml):
       self.hist["level_change"][t].append(abs(lvl))
       self.hist["skew_change"][t].append(abs(skw))
       self.hist["smile_change"][t].append(abs(sml))
   def _rolling(self,t):
       h = self.hist["level_change"][t]
       if len(h)<3: return None
       r = np.mean(list(h)[-3:])
       self.hist["rolling_level"][t].append(r)
       return r
   def _passQuant(self,t,lvl,skw,sml,roll):
       def ok(f,val):
           q=np.quantile(self.hist[f][t],self.q[f]);return abs(val)>q
       return all([ok("level_change",lvl),ok("rolling_level",roll),ok("skew_change",skw),ok("smile_change",sml)])


   # ---------- 5. ORDER HELPERS ---------------------------------------------
   def _enterStraddle(self,contracts,spot,t,dir):
       if not self._canOpen(): return
       c=self._pick(contracts,spot,OptionRight.Call); p=self._pick(contracts,spot,OptionRight.Put)
       if not(c and p): return
       qty=self._size(); self.MarketOrder(c.Symbol,dir*qty); self.MarketOrder(p.Symbol,dir*qty)
       self._store(c,dir,t); self._store(p,dir,t); self.tradesToday+=1
       if dir==-1: self._hedge(t)
   def _enterStrangle(self,contracts,spot,t,dir):
       if not self._canOpen(): return
       c=self._pick(contracts,spot*1.05,OptionRight.Call); p=self._pick(contracts,spot*0.95,OptionRight.Put)
       if not(c and p): return
       qty=self._size(); self.MarketOrder(c.Symbol,dir*qty); self.MarketOrder(p.Symbol,dir*qty)
       self._store(c,dir,t); self._store(p,dir,t); self.tradesToday+=1
       if dir==-1: self._hedge(t)

   def _store(self,contract,dir,t):
       self.positions[contract.Symbol]={"entry_time":self.Time,"entry_price":contract.AskPrice,"direction":dir,"underlying":self.under[t]}

   # ---------- 6. DELTA HEDGING ---------------------------------------------
   def _hedge(self,t):
       net=sum(c.Greeks.Delta*self.Portfolio[c.Symbol].Quantity for ch in self.CurrentSlice.OptionChains.values() for c in ch if c.Symbol in self.positions and self.positions[c.Symbol]["underlying"]==self.under[t])
       qty=-int(round(net))
       if qty!=0:
           self.Debug(f"HEDGE {t} netΔ={net:.2f} qty={qty}")
           self.MarketOrder(self.under[t],qty)

   # ---------- 7. POSITION MANAGEMENT ---------------------------------------
   def _manage(self,slice):
       netΔ={t:0 for t in self.tickers}
       for sym,pos in list(self.positions.items()):
           if not self.Portfolio[sym].Invested:
               self.positions.pop(sym); continue
           d=(self.Time-pos["entry_time"]).days
           pnl=(self.Securities[sym].Price-pos["entry_price"])/pos["entry_price"]*pos["direction"]
           # accumulate delta
           for ch in slice.OptionChains.values():
               for c in ch:
                   if c.Symbol==sym:
                       netΔ[pos["underlying"].Value]+=c.Greeks.Delta*self.Portfolio[sym].Quantity
           # exit rules
           if pnl>=self.tp or pnl<=self.sl or d>self.maxHold:
               self.Debug(f"EXIT {sym.Value} pnl={pnl:.2%} days={d}")
               self.Liquidate(sym); self.positions.pop(sym,None)
       # portfolio hedge
       for t,Δ in netΔ.items():
           if abs(Δ)>self.deltaThresh:
               qty=-int(round(Δ)); self.Debug(f"HEDGE {t} netΔ={Δ:.2f} qty={qty}")
               self.MarketOrder(self.under[t],qty)

   # ---------- 8. UTILITIES --------------------------------------------------
   def _pick(self,contracts,target,right):
       pool=[c for c in contracts if c.Right==right and abs(c.Strike-target)<2]
       return max(pool,key=lambda x:x.OpenInterest) if pool else None
   def _canOpen(self,need=15_000):
       return self.Portfolio.MarginRemaining>need and len(self.positions)<self.maxTradesPerDay
   def _size(self):
       risk=self.Portfolio.TotalPortfolioValue*0.005; est=500
       return max(1,min(int(risk/est),50))