| Overall Statistics |
|
Total Orders 4 Average Win 3.89% Average Loss -4.94% Compounding Annual Return -13.801% Drawdown 1.300% Expectancy -0.106 Start Equity 5000 End Equity 4938 Net Profit -1.240% Sharpe Ratio -8.48 Sortino Ratio -6.811 Probabilistic Sharpe Ratio 0.002% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 0.79 Alpha -0.137 Beta 0.076 Annual Standard Deviation 0.018 Annual Variance 0 Information Ratio 0.774 Tracking Error 0.101 Treynor Ratio -2.03 Total Fees $0.00 Estimated Strategy Capacity $1000.00 Lowest Capacity Asset TQQQ 32BINNWRRXQKM|TQQQ UK280CGTCB51 Portfolio Turnover 0.67% |
from AlgorithmImports import *
import numpy as np
from collections import defaultdict
class IBSVolatilityPendulum(QCAlgorithm):
"""
------------------------------------------------------------------------------------------------
Summary:
The Volatility Pendulum is an advanced options trading strategy
designed to capitalize on extreme Internal Bar Strength (IBS)
readings in the TQQQ ETF. By strategically selling Put Vertical
spreads during oversold conditions, this algorithm aims to generate
excess returns while managing downside risk.
Stratey Documentation:
https://docs.google.com/document/d/1rvoyZbaPbfEeT6bl4KbnbUrSK0f0em586Iu1wbK-Zoo/edit
Entry Signal:
- IBS < 0.2 (IBS = (Close - Low) / (High - Low))
Strike Selection Criteria
- Days to Expiration (DTE): 20-30 days
- Higher (Short) Put: 28-40 delta
- Lower (Long) Put: 1-2 strike steps below the short put
- Spread Selection: Optimize for best risk/reward profile
Position Sizing
- Position Sizing: 0.3 * Portfolio.availableBuyingPower
- Max Open Positions: 100
Exit Signals (first to occur):
- IBS > 0.8
- DTE ≤ 1
- Profit ≥ 25% of max potential profit
- Loss ≥ 25% of max potential loss
TODO:
- Pick strikes based on ATR instead of Delta
- Implement Stop Loss logic (underlyuing)
- Implement Stop Loss logic (pct of max loss)
- Money Management Open position risking 30% of available buying power
- Add tag for % of loss / profit for the spread
- Understand why this option exercise keeps happening, even with DTE exit
- 2023-04-22 00:00:00 TQQQ 230421P00024500 Buy Option Exercise
- When calculating DTEExit, it should account for weekends.
- Check margin requirements before attempting trade
- Include transaction fees when calculating max loss / profit
- Form and test hypotheses on time of day to enter
------------------------------------------------------------------------------------------------
"""
## Initialize the algo
## ------------------------
def Initialize(self):
self.InitAlgo()
self.InitParams()
self.InitData()
self.ScheduleRoutines()
## Init backtest details
## ------------------------
def InitAlgo(self):
self.ticker = "TQQQ" # Ticker symbol to trade
self.SetBenchmark("SPY") # Benchmark for reporting (buy and hold)
self.SetStartDate(2023, 9, 11) # Backtest start date
self.SetEndDate(2023, 10, 11) # Backtest end date
self.SetCash(5000) # Starting portfolio balance
## Init Local and external params
## --------------------------------
def InitParams(self):
self.exitMessage = ""
self.targetProfitPct = int(self.get_parameter("targetProfitPct"))
self.targetLossPct = int(self.get_parameter("targetLossPct"))
self.qty = int(self.get_parameter("qty"))
self.addToPositions = int(self.get_parameter("addToPositions")) == 1
self.ibsExitThresh = float(self.get_parameter("ibsExitThresh"))
self.ibsEnterThresh = float(self.get_parameter("ibsEnterThresh"))
self.tradeOptions = int(self.get_parameter("tradeOptions")) == 1
self.checkIBSExit = int(self.get_parameter("checkIBSExit")) == 1
self.dteExit = int(self.get_parameter("dteExit"))
# Filtering criteria for days to expiration (DTE)
self.maxDTEDays = int(self.get_parameter("minDTE")) + int(self.get_parameter("dteDiff"))
self.min_dte = timedelta(days=int(self.get_parameter("minDTE")))
self.max_dte = timedelta(days=(self.maxDTEDays))
# self.max_dte = timedelta(days=int(self.get_parameter("maxDTE")))
self.goShort = False
self.useComboOrders = True
self.lastDailyBar = None
## Init Data Feed, Conoslidators, etc
## -----------------------------------
def InitData(self):
# Subscrbe to a minute data feed (minute bars)
res = Resolution.Minute if self.tradeOptions else Resolution.Daily
equity = self.AddEquity(self.ticker, res)
self.symbol = equity.symbol
equity.SetDataNormalizationMode(DataNormalizationMode.Raw)
# Set up the daily bar consolidator
self.dailyConsolidator = TradeBarConsolidator(timedelta(days=1))
self.dailyConsolidator.DataConsolidated += lambda _, dailyBar: setattr(self, 'lastDailyBar', dailyBar)
self.SubscriptionManager.AddConsolidator(self.symbol, self.dailyConsolidator)
if self.tradeOptions:
self.InitOptionsData()
## Init Options Data
## -----------------
def InitOptionsData(self):
self.chainLength = 100
option = self.AddOption(self.ticker, Resolution.Minute)
self.optionSymbol = option.Symbol
option.SetFilter(lambda universe: universe.IncludeWeeklys().Strikes(-self.chainLength//2, 0)\
.PutsOnly()\
.Expiration(self.min_dte, self.max_dte))
self.SetSecurityInitializer(CompositeSecurityInitializer(self.SecurityInitializer, FuncSecurityInitializer(self.CustomSecurityInitializer)))
## Schedule recurring logic (chron jobs)
## --------------------------------------
def ScheduleRoutines(self):
# Schedule a daily chron job to check for signals at the open
self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.AfterMarketOpen(self.ticker, 5),
self.CheckForEntries)
if self.tradeOptions:
# # Schedule a daily chron job to check for exits every few minutes
self.Schedule.On(self.DateRules.EveryDay(self.ticker),
self.TimeRules.Every(timedelta(minutes=3)),
self.CheckForExits)
# Schedule a daily chron job to check for exit at the EoD
# self.Schedule.On(self.DateRules.EveryDay(), \
# self.TimeRules.AfterMarketOpen(self.ticker, 329),
# self.CheckForExits)
else:
# Schedule a daily chron job to check for signals at the open
self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.AfterMarketOpen(self.ticker, 6),
self.CheckForExits)
## Check for entries
## Called as soon as IBS signal is ready,
## At a scheduled time every day.
## --------------------------------------
def CheckForEntries(self):
if not self.DataIsReady(): return
if self.EntrySignalFired():
if( not self.Portfolio.Invested or self.addToPositions ):
if self.tradeOptions:
self.OpenOptionsTrade()
else:
self.SetHoldings(self.ticker, 1)
## Convenience: Check if Data is Ready
## -----------------------------------
def DataIsReady(self):
data = self.CurrentSlice
if self.IsWarmingUp or (self.ticker not in data ) or (data[self.ticker] is None):
self.Log("\t\tNo data, or warming up ")
return False
return True
## Check for exits
## Called intraday at a scheduled time.
## 1. IBS > 0.8
## 2. DTE≤1
## 3. Profit ≥ 25% of max potential profit
## 4. Loss ≥ 25% of max potential loss
## -------------------------------------------------------
def CheckForExits(self):
if not self.DataIsReady(): return
if not self.is_market_open(self.symbol): return
self.Plot(f"{self.ticker}: Price","Ticker Price",self.Securities[self.ticker].Price)
if self.Portfolio.Invested:
# General Exit 1. IBS threshold breached
# ----------------------------------------
if( self.checkIBSExit ):
if self.lastDailyBar is not None:
if(self.lastDailyBar.high != self.lastDailyBar.low):
ibsValue = (self.lastDailyBar.close - self.lastDailyBar.low) / (self.lastDailyBar.high - self.lastDailyBar.low)
if (ibsValue > self.ibsExitThresh ):
self.exitMessage = "IBS Exit Threshold breached. Exit all holdings"
self.Liquidate(tag=f"{self.exitMessage}")
return
## Handle Exits for options
## ------------------------
if self.tradeOptions:
## Check Individual Spreads for exit Criteria
## ---------------------------------------------------
spreads = self.GetPutCreditSpreadsInPortfolio()
priceInfo = f"{self.ticker} @ ${self.Securities[self.ticker].Price}"
for (short_position, long_position) in spreads:
currentValue = round(self.GetNetPremium(short_position.Symbol, long_position.Symbol),2)
curValLabel = f"[${currentValue} Spread Value]"
spreadLabel = f"{short_position.Symbol.ID.StrikePrice}~{long_position.Symbol.ID.StrikePrice}"
initial_premium = abs(short_position.AveragePrice) - abs(long_position.AveragePrice)
unrealizedPnL = round((initial_premium - currentValue),2)
iconLabel = "❌" if unrealizedPnL <= 0 else "✅"
pctOfMaxProfit = self.percent_max_profit_achieved(short_position, long_position)
pctOfMaxLoss = self.percent_max_loss_achieved(short_position, long_position)
roiPercentage = self.calculate_return_on_credit(short_position, long_position)
# Debug Plots
# ------------------------
# self.Plot(f"% Profit: {spreadLabel}","% max profit", pctOfMaxProfit )
# self.Plot(f"% PLoss: {spreadLabel}","% max loss", pctOfMaxLoss )
# self.Plot(f"Value: {spreadLabel}","Curr Premium", currentValue )
# self.Plot(f"ROI Pct: {spreadLabel}","roiPercentage", roiPercentage )
# Spread Exit 1. Expiration Date <= self.dteExit (backtested at 1 DTE)
# ------------------------------------------
expiry = short_position.Symbol.ID.Date
days_till_expiration = (expiry - self.Time).days
# Todo: try holding till the day of, if
# if days_till_expiration <= 1 and (roiPercentage <= 0):
if days_till_expiration <= self.dteExit:
self.exitMessage = f'[{iconLabel}][{spreadLabel}][DTE Exit] | {curValLabel} | {unrealizedPnL} PnL | {days_till_expiration} DTE <= {self.dteExit} {priceInfo}'
self.liquidate(short_position.Symbol, tag=self.exitMessage)
self.liquidate(long_position.Symbol, tag=self.exitMessage)
# Spread Exit 2. Profit ≥ X% of max potential profit
# --------------------------------------------------
# elif (pctOfMaxProfit >= 250):
elif (roiPercentage >= self.targetProfitPct):
self.exitMessage = f'[{iconLabel}][{spreadLabel}][Take Profit] | {curValLabel} | {unrealizedPnL} PnL | {roiPercentage}% ROI Profit ≥ {self.targetProfitPct}% of max potential profit {priceInfo}'
self.liquidate(short_position.Symbol, tag=self.exitMessage)
self.liquidate(long_position.Symbol, tag=self.exitMessage)
# Spread Exit 3. Loss ≥ X% of max potential loss
# -------------------------------------------------
elif (pctOfMaxLoss >= self.targetLossPct):
# elif (roiPercentage <= -25):
self.exitMessage = f'[{iconLabel}][{spreadLabel}][Stop Loss] | {curValLabel} | {unrealizedPnL} PnL | {roiPercentage}% ROI Loss ≥ {self.targetLossPct}% of max potential loss {priceInfo}'
self.liquidate(short_position.Symbol, tag=self.exitMessage)
self.liquidate(long_position.Symbol, tag=self.exitMessage)
# Spread Exit 4. Underlying below lower (long) strike
# -------------------------------------------------------
# elif (self.Securities[self.ticker].Price <= long_position.Symbol.ID.StrikePrice):
# self.exitMessage = f'[❌ Underlying Stop Loss] {spreadLabel} @ {self.Securities[self.ticker].Price} < {long_position.Symbol.ID.StrikePrice} | {priceInfo}'
# self.liquidate(short_position.Symbol, tag=self.exitMessage)
# self.liquidate(long_position.Symbol, tag=self.exitMessage)
## 3. Close any holdings of the Underlying, if any
## -----------------------------------------------
for symbol, holding in self.Portfolio.items():
# Check if the held position is not an option. If so, close position.
if (holding.Invested) and (holding.Type != SecurityType.Option):
self.Debug(f"Holding {symbol.Value} is a {holding.Type}. Liquidate.")
self.Liquidate(tag=f"Holding {symbol.Value} is a {holding.Type}")
## Go long when IBS < 0.2
## ------------------------------
def EntrySignalFired(self):
if self.lastDailyBar is not None:
if(self.lastDailyBar.high != self.lastDailyBar.low):
ibsValue = (self.lastDailyBar.close - self.lastDailyBar.low) / (self.lastDailyBar.high - self.lastDailyBar.low)
return (ibsValue < self.ibsEnterThresh )
return False
def CustomSecurityInitializer(self, security):
security.SetMarketPrice(self.GetLastKnownPrice(security))
if Extensions.IsOption(security.Symbol.SecurityType):
security.SetOptionAssignmentModel(NullOptionAssignmentModel())
security.SetFeeModel(ConstantFeeModel(0))
security.SetMarketPrice(self.GetLastKnownPrice(security))
security.SetFillModel(CustomFillModel())
## Open Options Trade
## ------------------------------
def OpenOptionsTrade(self):
slice = self.current_slice
gotBPS = False
# Get the OptionChain
chain = slice.OptionChains.get(self.optionSymbol, None)
if not chain: return
# Get the furthest expiration date of the contracts
expiry = sorted(chain, key = lambda x: x.Expiry, reverse=True)[0].Expiry
# Get ComboMarketOrders to trade. Doesnt Work with Tradier
pairs = self.GetPutPairs(chain)
topPair = self.GetTopRRRatioPair(pairs)
if topPair is None:
return
self.quit()
else:
short_put = topPair[0]
long_put = topPair[1]
# gotBPS, bps_strategy = self.OpenBullPutSpread(short_put.Strike, long_put.Strike, long_put.Expiry )
bps_strategy = OptionStrategies.BullPutSpread(self.optionSymbol, short_put.Strike, long_put.Strike, long_put.Expiry)
# If we've got contracts, trade them
if( bps_strategy is not None ):
if(self.useComboOrders):
prefix = "[++]" if self.Portfolio.Invested else "[+] "
spreadInfo = f"{bps_strategy.OptionLegs[0].Strike} ~ {bps_strategy.OptionLegs[1].Strike}"
priceInfo = f"{self.ticker} @ ${self.Securities[self.ticker].Price}"
costBasis = f"${round((short_put.BidPrice - long_put.AskPrice),2)} Received"
self.Buy(bps_strategy, self.qty, tag=f"{prefix} {spreadInfo} {priceInfo} | {costBasis}")
else:
self.Debug("Open position one leg at a time.")
self.Debug("Not implemented yet.")
self.quit()
return
else:
self.Debug(f"{self.Time} | OptionStrategies.BullPutSpread Failed")
## Get all possible put pairs that match our criteria:
## 1. Days to Expiration (DTE): 20-30 days
## 2. Strike Selection:
## 3. Higher (Short) Put: 28-40 delta
## 4. Lower (Long) Put: 1-2 strike steps below the short put
## -------------------------------------------------------
def GetPutPairs(self, chain):
from datetime import timedelta
# Current time
current_time = self.Time
# Filter all puts with expiration within the desired DTE range
eligible_puts = []
for i in chain:
if i.Right == OptionRight.Put:
dte = i.Expiry - current_time
if self.min_dte <= dte <= self.max_dte:
eligible_puts.append(i)
# Filter puts with delta between -0.40 and -0.28
high_puts = [put for put in eligible_puts if -0.40 <= put.Greeks.Delta <= -0.28]
# Get unique strike prices sorted
strike_prices = sorted(set(put.Strike for put in eligible_puts))
# Initialize list for put pairs
put_pairs = []
# Loop over each high_put to find suitable low_puts
for high_put in high_puts:
high_strike = high_put.Strike
expiry = high_put.Expiry
# Find the index of the high_put strike in the sorted strike prices list
high_strike_index = strike_prices.index(high_strike)
# Define possible lower strikes as 1 or 2 steps below the high strike
possible_low_strikes = strike_prices[max(0, high_strike_index - 2):high_strike_index]
# Find puts that match the criteria for lower strikes
low_puts = [put for put in eligible_puts if put.Expiry == expiry and put.Strike in possible_low_strikes]
# Create pairs of high and low puts
for low_put in low_puts:
put_pairs.append((high_put, low_put))
return put_pairs
## Get the Pair with the most favorable Reward/Risk ratio
## ------------------------------------------------------
def GetTopRRRatioPair(self, put_pairs):
# Initialize variables to keep track of the best pair and highest RR ratio
top_pair = None
highest_rr_ratio = float('-inf')
self.Debug("------------------------")
self.Debug(self.Time.strftime("%Y-%m-%d %H:%M:%S"))
self.Debug("Pairs Under Consideration:")
self.Debug("spread | collected credit | reward-risk ratio")
# Iterate through each put pair
for high_put, low_put in put_pairs:
# Calculate collected credit (difference in strikes)
collected_credit = round((high_put.BidPrice - low_put.AskPrice),2)
if( collected_credit < 0):
continue
# Calculate max loss (difference in strikes minus collected credit)
max_loss = (high_put.Strike - low_put.Strike) - collected_credit
max_loss = round(max_loss,2)
# ( ITM put strike price - OTM strke ) + (ITM put value at position opening (credit received) - OTM put value at position opening (debit paid)Contract multiplierTime of expiration
# Calculate RR ratio (collected credit divided by max loss)
if max_loss > 0: # Ensure max_loss is positive to avoid division by zero
rr_ratio = round((collected_credit / max_loss),2)
# Debug: Print RR ratio for the current pair
# self.Debug(f"RR Ratio for pair ({high_put.Strike}, {low_put.Strike}): {rr_ratio}")
# Update the top pair if the current RR ratio is higher than the highest found so far
if rr_ratio > highest_rr_ratio:
highest_rr_ratio = rr_ratio
top_pair = (high_put, low_put)
self.Debug(f" {high_put.Strike} ~ {low_put.Strike} | {collected_credit} / {max_loss}| RR:{rr_ratio}")
self.Debug("------------------------")
# Return the pair with the highest RR ratio
return top_pair
# Example usage within the same class:
# top_pair = self.GetTopRRRatioPair(put_pairs)
## Get all the PCS pairs in the portfolio
## Identify put credit spreads in the portfolio.
##
## :return: List of tuples, each containing (short_contract, long_contract)
## ------------------------------------------
def GetPutCreditSpreadsInPortfolio(self):
# Group put options by underlying symbol and expiration
put_options = defaultdict(lambda: defaultdict(list))
for holding in self.Portfolio.Values:
if holding.Type == SecurityType.Option and \
holding.Symbol.ID.OptionRight == OptionRight.Put and \
holding.Invested:
underlying = holding.Symbol.Underlying
expiry = holding.Symbol.ID.Date
put_options[underlying][expiry].append(holding)
credit_spreads = []
for underlying in put_options:
for expiry in put_options[underlying]:
# Sort puts by strike price, descending
sorted_puts = sorted(put_options[underlying][expiry],
key=lambda x: x.Symbol.ID.StrikePrice, reverse=True)
for i in range(len(sorted_puts) - 1):
higher_strike = sorted_puts[i]
lower_strike = sorted_puts[i + 1]
# Check if it's a valid put credit spread
if (higher_strike.Quantity < 0 and lower_strike.Quantity > 0 and
higher_strike.Symbol.ID.StrikePrice > lower_strike.Symbol.ID.StrikePrice):
credit_spreads.append((higher_strike, lower_strike))
return credit_spreads
## Calculate the percentage of max profit achieved for a put credit spread.
##
## :param short_position: The short put option position
## :param long_position: The long put option position
## :return: Percentage of max profit achieved (0-100)
## ------—------—------—------—------—------—------—------—------—------—
def percent_max_profit_achieved(self, short_position, long_position):
# Calculate initial and current net premiums
initial_net_premium = abs(short_position.AveragePrice) - abs(long_position.AveragePrice)
current_net_premium = self.GetNetPremium(short_position.Symbol, long_position.Symbol)
# current_net_premium = abs(self.Securities[short_position.Symbol].Price) - abs(self.Securities[long_position.Symbol].Price)
if initial_net_premium == 0:
pctOfMaxProfit = 0 # Avoid division by zero
else:
percent = (initial_net_premium - current_net_premium) / initial_net_premium * 100
pctOfMaxProfit = np.clip(percent, 0, 100) # Ensure result is between 0% and 100%
return pctOfMaxProfit
## Calculate the percentage of max loss achieved for a put credit spread.
##
## :param short_position: The short put option position
## :param long_position: The long put option position
## :return: Percentage of max loss achieved (0-100)
## ------------------------------------------------------------------------
def percent_max_loss_achieved(self, short_position, long_position):
# Calculate initial and current net premiums
initial_net_premium = abs(short_position.AveragePrice) - abs(long_position.AveragePrice)
current_net_premium = self.GetNetPremium(short_position.Symbol, long_position.Symbol)
# current_net_premium = abs(self.Securities[short_position.Symbol].Price) - abs(self.Securities[long_position.Symbol].Price)
# Calculate max loss
max_loss = (short_position.Symbol.ID.StrikePrice - long_position.Symbol.ID.StrikePrice) - initial_net_premium
if max_loss - initial_net_premium == 0:
pctOfMaxLoss = 0 # Avoid division by zero
else:
percent = (current_net_premium - initial_net_premium) / (max_loss - initial_net_premium) * 100
pctOfMaxLoss = np.clip(percent, 0, 100) # Ensure result is between 0% and 100%
return pctOfMaxLoss
## Calculate the return on the cost basis (credit received) of a put credit spread.
##
## :param short_position: The short put option position
## :param long_position: The long put option position
## :return: Percentage return on the credit received
## --------------------------------------------
def calculate_return_on_credit(self, short_position, long_position):
# Calculate the initial credit received (cost basis)
initial_credit = abs(short_position.AveragePrice) - abs(long_position.AveragePrice)
# Get the current cost to close the position
current_cost = self.GetNetPremium(short_position.Symbol, long_position.Symbol)
# Calculate the current profit/loss
profit_loss = initial_credit - current_cost
# Calculate the return as a percentage of the initial credit
if initial_credit == 0:
return 0 # Avoid division by zero
return_percentage = round(((profit_loss / initial_credit) * 100),3)
return return_percentage
## Calculate the current net premium of a put credit spread.
##
## :param short_symbol: Symbol of the short put option
## :param long_symbol: Symbol of the long put option
## :return: Current net premium of the spread
## ----------------------------------------------------------------
def GetNetPremium(self, short_symbol, long_symbol):
short_option = self.Securities[short_symbol]
long_option = self.Securities[long_symbol]
# Use ask price for short put (cost to buy back)
short_price = short_option.AskPrice
# Use bid price for long put (what we'd receive to sell)
long_price = long_option.BidPrice
# If ask or bid is zero or not available, fall back to last price
if short_price == 0:
short_price = short_option.Price
if long_price == 0:
long_price = long_option.Price
# Calculate net premium
net_premium = short_price - long_price
return abs(net_premium) # Return absolute value for consistency
######################################
## Custom Fill Model
## TODO: Evluate necessity
######################################
class CustomFillModel(FillModel):
def MarketFill(self, asset, order):
if order.Direction == OrderDirection.Buy:
fill_price = asset.AskPrice
else:
fill_price = asset.BidPrice
fill = super().MarketFill(asset, order)
fill.FillPrice = fill_price
return fill