| Overall Statistics |
|
Total Trades 252 Average Win 6.57% Average Loss -1.69% Compounding Annual Return 25.656% Drawdown 26.400% Expectancy 0.085 Net Profit 6.166% Sharpe Ratio 0.679 Probabilistic Sharpe Ratio 38.609% Loss Rate 78% Win Rate 22% Profit-Loss Ratio 3.88 Alpha 0.571 Beta -0.309 Annual Standard Deviation 0.74 Annual Variance 0.548 Information Ratio 0.372 Tracking Error 0.76 Treynor Ratio -1.628 Total Fees $0.00 Estimated Strategy Capacity $2800000.00 Lowest Capacity Asset SPXW Y7CLD98LDD2M|SPX 31 Portfolio Turnover 5.86% |
# region imports
from AlgorithmImports import *
# endregion
import math
class CalmYellowGreenBadger(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2023, 1, 1) # Set Start Date
self.SetEndDate(2023, 4, 7) # Set End Date
self.SetCash(10**6) # Set Strategy Cash
self.AdjustedCash = 1 * self.Portfolio.TotalPortfolioValue
self.name_cash = {}
self.assets = [["SPX", 5]]
#self.assets = [["SPX", 5], ["NDX", 10]]
for asset, strike_width in self.assets:
name_cash_total = self.AdjustedCash/len(self.assets)
name_savings = int(name_cash_total/2)
name_trading = int(name_cash_total/2)
self.name_cash[asset] = {"Savings": name_savings, "Trading": name_trading}
self.minimum_trade_account = self.name_cash[asset]["Savings"]
self.symbols = []
self.stop_loss_percentage = 0.20 # Percentage to stop loss relative to initial price
self.take_profit_percentage = 100 # Percentage to take profit relative to initial price
for asset, strike_width in self.assets:
#option = self.AddIndexOption(asset, Resolution.Minute)
index = self.AddIndex("SPX", Resolution.Minute).Symbol
option = self.AddIndexOption(index, "SPXW", Resolution.Minute)
option.SetFilter(lambda x: x.WeeklysOnly().Expiration(0, 0))
#option.SetFilter(-2, 2, 7, 7)
self.symbols.append([option.Symbol, strike_width, asset])
self.contract_prices = {}
def Get_Strike_Price(self, underlying, strike_width, strike_number)->float:
"""
Get the strike price for the given underlying price
Args:
underlying (float): Underlying price
strike_width (float): Strike width
strike_number (int): True if call, False if put. Defaults to True.
Returns:
float: Strike price
"""
strike = 0
if strike_number < 0:
strike_number += 1
strike = round(underlying + strike_width * strike_number - underlying % strike_width, 2)
return strike
def Buy_Calls_Puts(self, data)->None:
"""
Buy calls and puts for each asset in the asset list
Args:
data (Slice): Slice of data for the current time step
"""
for asset_symbol, strike_width, asset in self.symbols:
chain = data.OptionChains.get(asset_symbol)
trading = 0.1 * self.name_cash[asset]["Trading"]
if chain:
self.Debug(f"Underlying price of {asset_symbol}: {chain.Underlying.Price} - {self.name_cash[asset]}")
call = [x for x in chain if x.Right == OptionRight.Call]
put = [x for x in chain if x.Right == OptionRight.Put]
"""
self.Debug("All")
self.Debug("Calls")
for contract in call:
self.Debug(f"{contract.Symbol} {contract.AskPrice} {contract.Strike} {contract.Expiry} {contract.Expiry.date()} {self.Time.date()} {contract.Expiry.date()==self.Time.date()}")
self.Debug("Puts")
for contract in put:
self.Debug(f"{contract.Symbol} {contract.AskPrice} {contract.Strike} {contract.Expiry}")
"""
# look at the strikes closest to underlying (accounts for cases where strikes aren't continuous)
call_strike = self.Get_Strike_Price(chain.Underlying.Price, strike_width, 2)
put_strike = self.Get_Strike_Price(chain.Underlying.Price, strike_width, -2)
call = [x for x in call if x.Strike == call_strike]
put = [x for x in put if x.Strike == put_strike]
# checks for daily expiration
# call = [x for x in call if (x.Expiry.day - self.Time.day) == 0 and (x.Expiry.month == self.Time.month) == 0]
#put = [x for x in put if (x.Expiry.day - self.Time.day) == 0 and (x.Expiry.month == self.Time.month) == 0]
"""
self.Debug(f"Filtered {chain.Underlying.Price} {call_strike} {put_strike}")
self.Debug("Calls")
for contract in call:
self.Debug(f"{contract.Symbol} {contract.AskPrice} {contract.Strike} {contract.Expiry}")
self.Debug("Puts")
for contract in put:
self.Debug(f"{contract.Symbol} {contract.AskPrice} {contract.Strike} {contract.Expiry}")
"""
if len(call) and call[-1].AskPrice != 0:
# Buy call
call_contract = call[-1]
call_sizing = int(trading * 0.5/(call_contract.AskPrice * 100))
if call_sizing == 0:
continue
self.Buy(call_contract.Symbol, call_sizing)
self.Debug(f"Bought Call: {call_contract.Symbol} - Sizing: {call_sizing} - AskPrice: {call_contract.AskPrice} - Strike: {call_contract.Strike} - Underlying: {call_contract.UnderlyingLastPrice} - Expiration: {call_contract.Expiry}")
self.contract_prices[call_contract.Symbol] = {"Price": call_contract.AskPrice, "Sizing": call_sizing, "Asset": asset}
if len(put) and put[-1].AskPrice != 0:
# Buy put
put_contract = put[-1]
put_sizing = int(trading * 0.5/(put_contract.AskPrice * 100))
if put_sizing == 0:
continue
self.Buy(put_contract.Symbol, put_sizing)
self.Debug(f"Bought Put: {put_contract.Symbol} - Sizing: {put_sizing} - AskPrice: {put_contract.AskPrice} - Strike: {put_contract.Strike} - Underlying: {put_contract.UnderlyingLastPrice} - Expiration: {put_contract.Expiry}")
self.contract_prices[put_contract.Symbol] = {"Price": put_contract.AskPrice, "Sizing": put_sizing, "Asset": asset}
def GetOrderFee(self, symbol, quantity)->float:
"""
Args:
symbol (Symbol): Symbol of the security to get the order fee for
quantity (int): Quantity of the symbol
Returns:
float: Order fee for the given symbol and quantity
"""
security = self.Securities[symbol]
order = MarketOrder(security.Symbol, quantity, self.UtcTime)
parameters = OrderFeeParameters(security, order)
return security.FeeModel.GetOrderFee(parameters).Value.Amount
def CompoundingAndBalance(self)->None:
"""
Checks if the trading account has doubled and if so, compounds. If the trading account falls below savings,
it balances the trading account by moving money from the savings account to bring it up to the old savings
account size.
"""
for asset in self.name_cash:
if self.name_cash[asset]["Savings"]/self.name_cash[asset]["Trading"] >= 2:
increase = round(self.name_cash[asset]["Trading"] * 0.25, 2)
self.name_cash[asset]["Savings"] -= increase
self.name_cash[asset]["Trading"] += increase
elif self.name_cash[asset]["Trading"] < self.name_cash[asset]["Savings"]:
#difference = self.name_cash[asset]["Savings"] - self.name_cash[asset]["Trading"]
difference = self.minimum_trade_account - self.name_cash[asset]["Trading"]
if difference <= self.name_cash[asset]["Savings"]:
self.name_cash[asset]["Trading"] += difference
self.name_cash[asset]["Savings"] -= difference
elif self.name_cash[asset]["Trading"] > self.name_cash[asset]["Savings"]:
#difference = self.name_cash[asset]["Trading"] - self.name_cash[asset]["Savings"]
difference = self.name_cash[asset]["Trading"] - self.minimum_trade_account
self.name_cash[asset]["Trading"] -= difference
self.name_cash[asset]["Savings"] += difference
def SellContracts(self, contract, asset, total_initial_size, total_current_size)->None:
"""
Sell all contracts of a given asset and update the cash dictionary
Args:
contract (contract): contract to sell
asset (str): Name of the asset
total_initial_size (int): Total size of the contracts at purchase
total_current_size (int): Total size of the contracts at current time
"""
self.Debug(f"Before sell {self.Portfolio.TotalPortfolioValue}")
net = total_current_size - total_initial_size
before_sell_value = self.Portfolio.TotalPortfolioValue
quantity = int(self.Portfolio[contract].Quantity)
order_fee = self.GetOrderFee(contract, quantity)
#self.Debug(f"Fee: {order_fee}")
net -= order_fee
self.Liquidate(contract)
del self.contract_prices[contract]
after_sell_value = self.Portfolio.TotalPortfolioValue
net += after_sell_value - before_sell_value
self.name_cash[asset]["Trading"] += net
self.Debug(f"After sell {self.Securities[contract].Symbol} {self.Portfolio.TotalPortfolioValue} - Total Net: {net} - Trading {asset}: {self.name_cash[asset]}")
self.CompoundingAndBalance()
def GetContractPosition(self, contract)->int:
"""
Get the total initial size and total current size of the contracts for the given asset
Args:
contract (contract): contract to sell
Returns:
int, int: Total initial size and total current size of the contracts for the given asset
"""
total_initial_size = 0
total_current_size = 0
# optionContract = self.Securities[contract].Symbol
# underlying = self.Securities[contract.Underlying].Price
quantity = int(self.Portfolio[contract].Quantity)
lastPrice = self.Securities[contract].Price
# profits = round(self.Portfolio[contract].UnrealizedProfit,0)
# profit_percentage = self.Portfolio[contract].UnrealizedProfitPercent
priceBought = self.contract_prices[contract]["Price"]
originalSizing = self.contract_prices[contract]["Sizing"]
total_initial_size += priceBought * 100 * quantity
total_current_size += lastPrice * 100 * quantity
return total_initial_size, total_current_size
def NameStopLossAndProfit(self)->None:
"""
Checks if the trading account has fallen below the stop loss percentage and if so, sells all contracts.
Also checks if the trading account has reached the take profit percentage and if so, sells all contracts.
"""
option_invested = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type==SecurityType.IndexOption]
# Check for combined loss
for contract in option_invested:
total_initial_size, total_current_size = self.GetContractPosition(contract)
asset = self.contract_prices[contract]["Asset"]
#self.Debug(f"Status {contract}: {total_initial_size} {total_current_size}")
if total_current_size/total_initial_size >= self.take_profit_percentage:
self.Debug(f"Sold positive: - Initial Size: {total_initial_size} - Current Size: {total_current_size} - Status: {total_current_size/total_initial_size}")
self.SellContracts(contract, asset, total_initial_size, total_current_size)
elif total_current_size/total_initial_size < self.stop_loss_percentage:
self.Debug(f"Stop loss - Initial $: {total_initial_size} - Current $: {total_current_size} - Net: {total_current_size-total_initial_size} - Status: {total_current_size/total_initial_size}")
self.SellContracts(contract, asset, total_initial_size, total_current_size)
def OnData(self, data: Slice):
if self.Portfolio.Invested:
self.NameStopLossAndProfit()
if self.Portfolio.Invested:
if self.Time.hour == 15 and self.Time.minute == 45:
self.Debug(f"Date Liquidate: {self.Time}")
option_invested = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type==SecurityType.IndexOption]
# Organize the contracts by asset
for contract in option_invested:
total_initial_size, total_current_size = self.GetContractPosition(contract)
asset = self.contract_prices[contract]["Asset"]
self.Debug(f"Sold {asset}- Initial $: {total_initial_size} - Current $: {total_current_size} - Net: {total_current_size-total_initial_size} - Status: {total_current_size/total_initial_size}")
self.SellContracts(contract, asset, total_initial_size, total_current_size)
if self.Time.hour == 10 and self.Time.minute == 5:
self.Debug(f"Date Buy: {self.Time}")
self.Buy_Calls_Puts(data)