| Overall Statistics |
|
Total Trades 19 Average Win 1.27% Average Loss 0% Compounding Annual Return 34.468% Drawdown 7.700% Expectancy -0.3 Net Profit 18.829% Sharpe Ratio 2.124 Probabilistic Sharpe Ratio 79.914% Loss Rate 30% Win Rate 70% Profit-Loss Ratio 0 Alpha -0.009 Beta 0.965 Annual Standard Deviation 0.111 Annual Variance 0.012 Information Ratio -1.298 Tracking Error 0.014 Treynor Ratio 0.244 Total Fees $12.00 Estimated Strategy Capacity $77000000.00 Lowest Capacity Asset SPY YA9UN6J60YQU|SPY R735QTJ8XC9X Portfolio Turnover 2.62% |
#region imports
from AlgorithmImports import *
#endregion
from Selection.OptionUniverseSelectionModel import OptionUniverseSelectionModel
from datetime import date, timedelta
class OptionsUniverseSelectionModel(OptionUniverseSelectionModel):
def __init__(self, select_option_chain_symbols):
super().__init__(timedelta(1), select_option_chain_symbols)
def Filter(self, filter):
## Define options filter -- strikes +/- 3 and expiry between 0 and 180 days away
return (filter.Strikes(-2, +2)
.Expiration(timedelta(0), timedelta(180)))#region imports
from AlgorithmImports import *
#endregion
import datetime
import math
import json
from dataclasses import dataclass, asdict
from typing import List, Any
from QuantConnect.Securities.Option import OptionPriceModels
from datetime import timedelta
from QuantConnect.Data.UniverseSelection import *
SYMBOL = "SPY"
TARGET_CASH_UTILIZATION = 1
REBALANCE_THRESHOLD = 0.01
STARTING_CASH = 50000
TARGET_CONTRACT_DAYS_IN_FUTURE = 2
SHOULD_LOG = True
MIN_EXPIRY = 2
MAX_EXPIRY = 4
"""
1. Put everything into SPY
2. Trade covered call on weekly basis
3. If exercised, buy back into SPY
"""
# the time of backtesting is covered in the log
@dataclass
class EquitySummary:
total_value: float = -1
cash_value: float = -1
eq_buy_count: int = -1
eq_count: int = -1
eq_price: float = -1 # equity price
@dataclass
class OptionSummary:
op_count: int = -1
op_buy_count: int = -1
op_value_total: float = -1
op_inventory: List[Any] = None
class CoveredCallAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2023, 1, 1)
self.SetEndDate(2023, 8, 1)
self.SetCash(STARTING_CASH)
self.TargetDelta = float(self.GetParameter("target_delta"))
self.LimitOrderRatio = float(self.GetParameter("limit_order_ratio"))
self.InitialSpyValue = None
# Resolution.Daily, Resolution.Hour, Resolution.Minute
equity = self.AddEquity(SYMBOL, Resolution.Daily)
option = self.AddOption(SYMBOL, Resolution.Hour)
self.Log(f'option.Symbol: {str(option.Symbol)}')
self.symbol = option.Symbol
self.UniverseSettings.Resolution = Resolution.Daily
# Look for options that are within 30 days of the target contract date.
# Options should be at or 60 strikes above present price.
# timedelta(max(0, TARGET_CONTRACT_DAYS_IN_FUTURE - 30))
# timedelta(TARGET_CONTRACT_DAYS_IN_FUTURE + 30)
option.SetFilter(1, 10, timedelta(MIN_EXPIRY), timedelta(MAX_EXPIRY))
# option.PriceModel = OptionPriceModels.CrankNicolsonFD()
option.PriceModel = OptionPriceModels.BjerksundStensland()
# Give a generous warm-up to allow the options price model to initialize.
# self.SetWarmUp(TimeSpan.FromDays(10))
self.SetBenchmark(equity.Symbol)
self.LastDay = None
def OnData(self, slice):
"""Processes the slice up to 1x per day."""
if self.IsWarmingUp:
return
if self.LastDay == self.Time.day:
return
self.LastDay = self.Time.day
equity_summary = self.BalancePortfolio(slice)
option_summary = self.TradeOptions(slice)
if SHOULD_LOG:
output = {}
output.update(asdict(equity_summary))
output.update(asdict(option_summary))
self.Log(json.dumps(output))
def BalancePortfolio(self, slice):
"""Transacts the underlying symbol as needed to arrive at the target cash utilization.
Buys or sells as necessary to keep the underlying symbol's value at TARGET_CASH_UTILIZATION
of the total portfolio value. Does not transact unless the imbalance is greater than REBALANCE_THRESHOLD.
"""
equity_summary = EquitySummary()
equity_summary.eq_buy_count = 0
if SYMBOL in slice and slice[SYMBOL] is not None:
# Get the current value and plot to main figure window.
current_spy = slice[SYMBOL].Close
equity_summary.eq_price = current_spy
if self.InitialSpyValue is None:
self.InitialSpyValue = current_spy
self.Plot("Strategy Equity", "SPY", STARTING_CASH * current_spy / self.InitialSpyValue)
# Calculate how many shares should be owned to hit target cash utilization.
value = self.Portfolio.TotalPortfolioValue
actual_n_shares = self.Portfolio[SYMBOL].Quantity
desired_n_shares = math.floor(value / current_spy)
equity_summary.total_value = value
equity_summary.cash_value = self.Portfolio.Cash
equity_summary.eq_count = actual_n_shares
delta_shares = desired_n_shares - actual_n_shares
# TODO isn't ie better to just calculate the cash?
# Always buy into SPY
if delta_shares > 1:
self.MarketOrder(SYMBOL, delta_shares)
equity_summary.eq_buy_count = delta_shares
return equity_summary
def TradeOptions(self, slice):
"""If enough shares exist to use as collateral, sells a call and places a limit order for that call."""
option_summary = OptionSummary()
number_of_contracts_available = math.floor(self.Portfolio[SYMBOL].Quantity / 100)
number_of_contracts = 0
option_summary.op_inventory = []
for symbol, holding in self.Portfolio.items():
if holding.Invested and holding.Type==SecurityType.Option:
contract = self.Securities[symbol]
strike_price = contract.StrikePrice
expiration_date = contract.Expiry.strftime('%Y-%m-%d')
sell_cost = holding.HoldingsCost
option_summary.op_inventory.append({
# 'id': str(symbol),
'strike_price': strike_price,
'expiration_date': expiration_date,
'sell_cost': sell_cost,
})
number_of_contracts += -holding.Quantity
options_to_buy = number_of_contracts_available - number_of_contracts
option_summary.op_count = int(number_of_contracts)
option_summary.op_buy_count = 0
if options_to_buy < 1:
return option_summary
option_summary.op_buy_count = options_to_buy
"""
if SHOULD_LOG:
self.Log(f"I presently own {self.Portfolio[SYMBOL].Quantity} shares of {SYMBOL}.")
self.Log(f"I presently have {number_of_contracts_available} that I can hold and "
f"only own {number_of_contracts}. I plan to buy {options_to_buy} more")
"""
chain = slice.OptionChains.GetValue(self.symbol)
contract = self.GetClosestContract(chain, self.Time + datetime.timedelta(TARGET_CONTRACT_DAYS_IN_FUTURE))
if contract is None:
return option_summary
"""
if SHOULD_LOG:
self.Log(f"Selling contract: Price: {contract.AskPrice} Underlying Price: {contract.UnderlyingLastPrice:1.2f} "
f"Strike: {contract.Strike:1.2f} Expiry: {contract.Expiry} Delta: {contract.Greeks.Delta:1.2f}")
"""
self.Sell(contract.Symbol, options_to_buy)
# self.LimitOrder(contract.Symbol, options_to_buy, round(contract.AskPrice * self.LimitOrderRatio, 2))
return option_summary
def GetClosestContract(self, chain, desired_expiration):
"""Gets the contract nearest the target delta and expiry."""
if not chain:
self.Log('empty chain')
return None
calls = [contract for contract in chain if contract.Right == OptionRight.Call]
if not calls:
self.Log('empty calls')
return None
for call in calls:
self.Log(f'Expiry: {call.Expiry} price: {call.Strike} ask:{call.AskPrice} bid:{call.BidPrice}')
# TODO be specific in the expiration and strike price
# Calculate the option expiry date nearest the target.
available_expirations = list({contract.Expiry for contract in calls})
nearest_expiration = sorted(available_expirations, key=lambda expiration: abs(expiration-desired_expiration))[0]
# For all contracts that match the target expiration, find the one with delta nearest target.
calls_at_target_expiration = [contract for contract in calls if contract.Expiry == nearest_expiration]
calls_at_target_expiration = sorted(calls_at_target_expiration, key = lambda contract: abs(contract.Greeks.Delta - self.TargetDelta))
if not calls_at_target_expiration:
return None
return calls_at_target_expiration[0]