| Overall Statistics |
|
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 3.86 Tracking Error 0.038 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset Portfolio Turnover 0% |
#region imports
from AlgorithmImports import *
import numpy as np
# This code implements the pairs trading algorithm detailed in the lecture notes. Credit to the authors.
class Math585HW(QCAlgorithm):
def Initialize(self):
self.lookback = 20
self.k = 2
self.SetStartDate(2018, 1, 1)
self.SetEndDate(2019, 1, 1)
self.cash = 1000000
self.SetCash(self.cash)
self.SetWarmup(self.lookback + 10)
self.AddEquity("AMD", Resolution.Daily)
self.AddEquity("JNJ", Resolution.Daily)
self.amd_history = []
self.jnj_history = []
#self.bollinger_amd = BollingerBands("AMD", 20, 2, MovingAverageType.Simple, Resolution.Daily)
#self.bollinger_jnj = BollingerBands("JNJ", 20, 2, MovingAverageType.Simple, Resolution.Daily)
def OnData(self, data):
amd_px = self.Securities["AMD"].Price
jnj_px = self.Securities["JNJ"].Price
self.amd_history.insert(0, amd_px)
self.jnj_history.insert(0, jnj_px)
if len(self.amd_history) > self.lookback:
self.amd_history.pop()
self.jnj_history.pop()
if self.IsWarmingUp:
return
amd_mean = np.mean(self.amd_history)
amd_std = np.std(self.amd_history)
jnj_mean = np.mean(self.jnj_history)
jnj_std = np.std(self.jnj_history)
amd_pos = self.Portfolio['AMD'].HoldingsValue / self.cash
jnj_pos = self.Portfolio['JNJ'].HoldingsValue / self.cash
self.Plot("AMD", "Px", amd_px)
self.Plot(f"AMD", "BB-{self.k}", amd_mean - self.k * amd_std)
self.Plot("AMD", "BB", amd_mean)
self.Plot(f"AMD", "BB+{self.k}", amd_mean + self.k * amd_std)
self.Plot("AMD", "Pos", amd_pos)
self.Plot("JNJ", "Px", jnj_px)
self.Plot(f"JNJ", "BB-{self.k}", jnj_mean - self.k * jnj_std)
self.Plot("JNJ", "BB", jnj_mean)
self.Plot(f"JNJ", "BB+{self.k}", jnj_mean + self.k * jnj_std)
self.Plot("JNJ", "Pos", jnj_pos)
if not self.Portfolio["AMD"].IsLong and not self.Portfolio["AMD"].IsShort:
if amd_px > amd_mean + self.k * amd_std:
self.SetHoldings("AMD", -0.5)
elif amd_px < amd_mean - self.k * amd_std:
self.SetHoldings("AMD", 0.5)
elif self.Portfolio["AMD"].IsLong:
if amd_px > amd_mean:
self.SetHoldings("AMD", 0)
elif self.Portfolio["AMD"].IsShort:
if amd_px < amd_mean:
self.SetHoldings("AMD", 0)
if not self.Portfolio["JNJ"].IsLong and not self.Portfolio["JNJ"].IsShort:
if jnj_px > jnj_mean + self.k * jnj_std:
self.SetHoldings("JNJ", -0.5)
elif jnj_px < jnj_mean - self.k * jnj_std:
self.SetHoldings("JNJ", 0.5)
elif self.Portfolio["JNJ"].IsLong:
if jnj_px > jnj_mean:
self.SetHoldings("JNJ", 0)
elif self.Portfolio["JNJ"].IsShort:
if jnj_px < jnj_mean:
self.SetHoldings("JNJ", 0)
# region imports
from AlgorithmImports import *
# endregion
# Code Re-used from previous homeworks.
class Math585HW(QCAlgorithm):
def Initialize(self):
self.ticker = "XOM" #"GS"
self.SetStartDate(2019, 2, 1) # Set Start Date
self.SetEndDate(2021, 2, 1)
#self.SetCash(1000000) # Set Strategy Cash
self.AddEquity(self.ticker, Resolution.Daily)
self.POS = 1
self.init_run = True
def OnData(self, data: Slice):
# On first call to OnData, do the following:
if self.init_run:
self.init_run = False
print("Placing Market ORder")
self.MarketOrder(self.ticker, self.POS)
# region imports
from AlgorithmImports import *
# endregion
# Code Re-used from previous homeworks.
class Math585HW(QCAlgorithm):
def Initialize(self):
self.ticker = "XOM" #"GS"
self.SetStartDate(2019, 2, 1) # Set Start Date
self.SetEndDate(2021, 2, 1)
#self.SetWarmup(100)
#self.SetCash(1000000) # Set Strategy Cash
self.AddEquity(self.ticker, Resolution.Daily)
#self.SetHoldings(self.ticker, 1)
self.POS = 1
self.init_run = True
self.sold = False
def OnData(self, data: Slice):
# On first call to OnData, do the following:
if self.init_run:
self.init_run = False
self.Debug("Placing Market ORder")
self.MarketOrder(self.ticker, self.POS)
self.start_px = self.Securities[self.ticker].Price
self.Debug(f"Starting Price = {self.start_px}")
else:
if not self.sold:
px = self.Securities[self.ticker].Price
if px < self.start_px*0.93 or px > self.start_px*1.07:
# Sell:
self.MarketOrder(self.ticker, -self.POS)
self.sold = True
# region imports
from AlgorithmImports import *
# endregion
# Code Re-used from previous homeworks.
class Math585HW(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 2, 1) # Set Start Date
self.SetEndDate(2021, 2, 1)
#self.SetWarmup(100)
self.SetCash(1000000) # Set Strategy Cash
self.AddEquity("GS", Resolution.Daily)
self.AddEquity("MS", Resolution.Daily)
self.AddEquity("AMD", Resolution.Daily)
self.AddEquity("XOM", Resolution.Daily)
#self.SetHoldings(self.ticker, 1)
self.init_run = True
self.sold = False
def OnData(self, data: Slice):
# On first call to OnData, do the following:
if self.init_run:
self.init_run = False
self.Debug("Placing Market ORder")
self.MarketOrder("GS", 500000 / self.Securities["GS"].Price)
self.MarketOrder("MS", -500000 / self.Securities["MS"].Price)# region imports
from AlgorithmImports import *
# endregion
# Code Re-used from previous homeworks.
class Math585HW(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 2, 1) # Set Start Date
self.SetEndDate(2021, 2, 1)
#self.SetWarmup(100)
self.SetCash(1000000) # Set Strategy Cash
self.AddEquity("GS", Resolution.Daily)
self.AddEquity("MS", Resolution.Daily)
self.AddEquity("AMD", Resolution.Daily)
self.AddEquity("XOM", Resolution.Daily)
#self.SetHoldings(self.ticker, 1)
self.init_run = True
self.sold = False
def OnData(self, data: Slice):
# On first call to OnData, do the following:
if self.init_run:
self.init_run = False
self.Debug("Placing Market ORder")
# Long
self.MarketOrder("GS", 500000 / self.Securities["GS"].Price)
self.MarketOrder("AMD", 500000 / self.Securities["AMD"].Price)
# Short
self.MarketOrder("MS", -500000 / self.Securities["MS"].Price)
self.MarketOrder("XOM", -500000 / self.Securities["XOM"].Price)# region imports
from AlgorithmImports import *
# endregion
# Code Re-used from previous homeworks.
# Referenced "simple_momentum_example.txt" on Sakai. Approach based on this reference. Credit to the authors.
class Math585HW(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 8, 20) # Set Start Date
self.SetEndDate(2020, 7, 20)
#self.SetWarmup(100)
self.SetCash(2000000) # Set Strategy Cash
self.SetWarmup(20)
self.AddEquity("AMD", Resolution.Daily)
self.AddEquity("AMZN", Resolution.Daily)
self.AddEquity("ROKU", Resolution.Daily)
self.AddEquity("JPM", Resolution.Daily)
self.trend_following_security = "AMZN"
self.sma20 =self.SMA(self.trend_following_security, 20, Resolution.Daily)
def OnData(self, data: Slice):
# Don't do anything during warmup period.
if self.IsWarmingUp:
return
# follow the AMZN
if self.Securities[self.trend_following_security].Price > self.sma20.Current.Value:
self.SetHoldings(self.trend_following_security, 0.5)
if self.Securities[self.trend_following_security].Price < self.sma20.Current.Value:
self.SetHoldings(self.trend_following_security, -0.5)# region imports
from AlgorithmImports import *
# endregion
# Code Re-used from previous homeworks.
# Referenced "simple_momentum_example.txt" on Sakai. Approach based on this reference. Credit to the authors.
class Math585HW(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 8, 20) # Set Start Date
self.SetEndDate(2020, 7, 20)
#self.SetWarmup(100)
self.SetCash(2000000) # Set Strategy Cash
self.SetWarmup(20)
self.AddEquity("AMD", Resolution.Daily)
self.AddEquity("AMZN", Resolution.Daily)
self.AddEquity("ROKU", Resolution.Daily)
self.AddEquity("JPM", Resolution.Daily)
self.trend_reversal_security = "ROKU"
self.sma20 =self.SMA(self.trend_reversal_security, 20, Resolution.Daily)
def OnData(self, data: Slice):
# Skip execution during warmup
if self.IsWarmingUp:
return
if self.Securities[self.trend_reversal_security].Price > self.sma20.Current.Value:
self.SetHoldings(self.trend_reversal_security, -0.5)
if self.Securities[self.trend_reversal_security].Price < self.sma20.Current.Value:
self.SetHoldings(self.trend_reversal_security, 0.5)# region imports
from AlgorithmImports import *
# endregion
# Code Re-used from previous homeworks.
# Referenced "simple_momentum_example.txt" on Sakai. Approach based on this reference. Credit to the authors.
class Math585HW(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 8, 20) # Set Start Date
self.SetEndDate(2020, 7, 20)
#self.SetWarmup(100)
self.SetCash(2000000) # Set Strategy Cash
self.SetWarmup(20)
self.AddEquity("AMD", Resolution.Daily)
self.AddEquity("AMZN", Resolution.Daily)
self.AddEquity("ROKU", Resolution.Daily)
self.AddEquity("JPM", Resolution.Daily)
self.trend_following_security = "AMZN"
self.trend_reversal_security = "ROKU"
self.follow_sma20 =self.SMA(self.trend_following_security, 20, Resolution.Daily)
self.reverse_sma20 =self.SMA(self.trend_reversal_security, 20, Resolution.Daily)
def OnData(self, data: Slice):
# Do nothign during warmup period
if self.IsWarmingUp:
return
# follow the AMZN
if self.Securities[self.trend_following_security].Price > self.follow_sma20.Current.Value:
self.SetHoldings(self.trend_following_security, 0.25)
if self.Securities[self.trend_following_security].Price < self.follow_sma20.Current.Value:
self.SetHoldings(self.trend_following_security, -0.25)
# Go against trend on ROKU
if self.Securities[self.trend_reversal_security].Price > self.reverse_sma20.Current.Value:
self.SetHoldings(self.trend_reversal_security, -0.25)
if self.Securities[self.trend_reversal_security].Price < self.reverse_sma20.Current.Value:
self.SetHoldings(self.trend_reversal_security, 0.25)#region imports
from AlgorithmImports import *
#endregion
# This code is modified starting from code on Sakai authored by Ye et. al. Credit to Prof. Ye for its authorship. Retrieved from Sakai 2/15/2023
# Result for Jan 1 2018: ['BAC', 'BRK.B', 'JPM']
# Bank of America
# Berkshire Hathaway
# JPMorgan
class Math585HW(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2018, 1, 1)
self.SetEndDate(2018, 1, 2)
self.AddUniverse(self.CoarseSelectionFilter, self.FineSelectionFilter)
self.UniverseSettings.Resolution = Resolution.Daily
self.SetSecurityInitializer(lambda x: x.SetDataNormalizationMode(DataNormalizationMode.Raw))
def CoarseSelectionFilter(self, coarse):
sortedByDollarVolume = sorted(coarse, key=lambda c: c.DollarVolume, reverse=True)
filteredByPrice = [c.Symbol for c in sortedByDollarVolume if c.Price>10]
self.filter_coarse = filteredByPrice[:100]
return self.filter_coarse
def FineSelectionFilter(self, fine):
fine1 = [x for x in fine if x.AssetClassification.MorningstarSectorCode == MorningstarSectorCode.FinancialServices]
sortedByMarketCap = sorted(fine1, key=lambda c: c.MarketCap, reverse=True)
filteredFine = [i.Symbol for i in sortedByMarketCap]
self.filter_fine = filteredFine[:3]
return self.filter_fine
def OnData(self, data):
k = [key.Value for key in data.Keys]
self.Debug(k)
#region imports
from AlgorithmImports import *
#endregion
from AlgorithmImports import *
import numpy as np
import statsmodels.api as model
from statsmodels.tsa.stattools import adfuller
# This code implements the pairs trading algorithm detailed in the lecture notes. Credit to the authors.
# The method of calculating the ADF is modified from code on slide 162 of lecture 8. Credit to the authors.
class Math585HW(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2018, 1, 1)
self.SetEndDate(2020, 1, 1)
self.SetCash(1000000)
self.SetWarmup(30)
self.p = "BAC"
self.q = "JPM"
self.AddEquity(self.p, Resolution.Daily)
self.AddEquity(self.q, Resolution.Daily)
self.lookback_window = 20 #days
#self.beta = 0.5624042647804646
#self.alpha = 2.718503876069648
#self.sigma = 0.020898581545820503
self.Debug("In Init")
def OnData(self, data):
if self.IsWarmingUp:
return
self.Debug(f"In OnData")
p_px = self.Securities[self.p].Price
q_px = self.Securities[self.q].Price
_, beta, alpha, _, sigma = self.calc_adf(self.History([self.p], self.lookback_window, resolution=Resolution.Daily)["close"].tolist(), self.History([self.q], self.lookback_window, resolution=Resolution.Daily)["close"].tolist())
self.Debug(f"P price: {p_px}; Q price: {q_px}")
residual = np.log(p_px) - np.log(q_px) * beta - alpha
self.Debug(residual)
z_idx = residual / sigma
entrythreshold = 2
exitthreshold = 0
wtp = 1 / (1 + beta)
wtq = beta / (1 + beta)
C = 1000000
self.Debug(f"Z-idx = {z_idx}")
#if not self.Portfolio[self.p].Invested:
if not self.Portfolio[self.p].IsLong and not self.Portfolio[self.p].IsShort:
if z_idx > entrythreshold:
self.MarketOrder(self.p, -wtp*C/p_px)
self.MarketOrder(self.q, wtq*C/q_px)
elif z_idx < -entrythreshold:
self.MarketOrder(self.p, wtp*C/p_px)
self.MarketOrder(self.q, -wtq*C/q_px)
elif self.Portfolio[self.p].IsLong:
if z_idx >= 0:
self.Liquidate(self.p)
self.Liquidate(self.q)
elif self.Portfolio[self.p].IsShort:
if z_idx <= 0:
self.Liquidate(self.p)
self.Liquidate(self.q)
def calc_adf(self, close1, close2):
# This method of calculating the ADF is modified from code on slide 162 of lecture 8. Credit to the authors.
logclose1 = np.log(close1)
logclose2 = np.log(close2)
ind = model.add_constant(logclose1)
dep = logclose2
leastsquare = model.OLS(dep, ind)
prod = leastsquare.fit()
slope = prod.params[1]
intercept = prod.params[0]
residuals = prod.resid
standard_dev_residuals = np.sqrt(prod.mse_resid)
# Residuals are mean zero. Therefore the z-index is simply observation over std dev.
z_index = residuals / standard_dev_residuals
adf = adfuller(residuals)
return adf[1], slope, intercept, z_index, standard_dev_residuals#region imports
from AlgorithmImports import *
#endregion
from AlgorithmImports import *
import numpy as np
import statsmodels.api as model
from statsmodels.tsa.stattools import adfuller
# This code implements the pairs trading algorithm detailed in the lecture notes. Credit to the authors.
# The method of calculating the ADF is modified from code on slide 162 of lecture 8. Credit to the authors.
class Math585HW(QCAlgorithm):
def Initialize(self):
self.p = "GOOG"
self.q = "MS"
self.AddEquity(self.p, Resolution.Daily)
self.AddEquity(self.q, Resolution.Daily)
self.SetStartDate(2018, 1, 1)
self.SetEndDate(2020, 1, 1)
self.SetCash(100000)
self.SetWarmup(150)
# Amount of market positions I want to take in total
self.C = 10000
self.lookback_window = 150 #days
#self.beta = 0.5624042647804646
#self.alpha = 2.718503876069648
#self.sigma = 0.020898581545820503
self.Debug("In Init")
def OnData(self, data):
if self.IsWarmingUp:
return
p_px = self.Securities[self.p].Price
q_px = self.Securities[self.q].Price
hist_p = self.History([self.p], self.lookback_window, resolution=Resolution.Daily)["close"].tolist()
hist_q = self.History([self.q], self.lookback_window, resolution=Resolution.Daily)["close"].tolist()
_, beta, alpha, _, sigma = self.calc_adf(hist_p, hist_q)
residual = np.log(p_px) - np.log(q_px) * beta - alpha
z_idx = residual / sigma
entrythreshold = 1
exitthreshold = 0.7
wtp = 1 / (1 + beta)
wtq = beta / (1 + beta)
msg = f"-----\nResidual = {residual}\nZ_index = {z_idx}"
self.Debug(msg)
#if not self.Portfolio[self.p].Invested:
if not self.Portfolio[self.p].IsLong and not self.Portfolio[self.p].IsShort:
if z_idx > entrythreshold:
self.MarketOrder(self.p, -wtp*self.C/p_px)
self.MarketOrder(self.q, wtq*self.C/q_px)
elif z_idx < -entrythreshold:
self.MarketOrder(self.p, wtp*self.C/p_px)
self.MarketOrder(self.q, -wtq*self.C/q_px)
elif self.Portfolio[self.p].IsLong:
if z_idx >= 0:
self.Liquidate(self.p)
self.Liquidate(self.q)
elif self.Portfolio[self.p].IsShort:
if z_idx <= 0:
self.Liquidate(self.p)
self.Liquidate(self.q)
def calc_adf(self, close1, close2):
# This method of calculating the ADF is modified from code on slide 162 of lecture 8. Credit to the authors.
logclose1 = np.log(close1)
logclose2 = np.log(close2)
ind = model.add_constant(logclose1)
dep = logclose2
leastsquare = model.OLS(dep, ind)
prod = leastsquare.fit()
slope = prod.params[1]
intercept = prod.params[0]
residuals = prod.resid
standard_dev_residuals = np.sqrt(prod.mse_resid)
# Residuals are mean zero. Therefore the z-index is simply observation over std dev.
z_index = residuals / standard_dev_residuals
adf = adfuller(residuals)
return adf[1], slope, intercept, z_index, standard_dev_residuals#region imports
from AlgorithmImports import *
#endregion
from AlgorithmImports import *
import numpy as np
# This code implements the pairs trading algorithm detailed in the lecture notes. Credit to the authors.
class Math585HW(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2018, 1, 1)
self.SetEndDate(2020, 1, 1)
self.SetCash(1000000)
self.p = "BRK.B"
self.q = "BAC"
self.AddEquity(self.p, Resolution.Daily)
self.AddEquity(self.q, Resolution.Daily)
self.beta = 0.34557999277780166
self.alpha = 1.4501248536931102
self.sigma = 0.06514845048594146
def OnData(self, data):
self.Debug(f"In OnData")
p_px = self.Securities[self.p].Price
q_px = self.Securities[self.q].Price
self.Debug(f"P price: {p_px}; Q price: {q_px}")
residual = np.log(p_px) - np.log(q_px) * self.beta - self.alpha
self.Debug(residual)
z_idx = residual / self.sigma
entrythreshold = 2
exitthreshold = 0
wtp = 1 / (1 + self.beta)
wtq = self.beta / (1 + self.beta)
C = 1000000
self.Debug(f"Z-idx = {z_idx}")
# Risk Management Condition:
if np.abs(z_idx) > 3:
self.Liquidate(self.p)
self.Liquidate(self.q)
#if not self.Portfolio[self.p].Invested:
if not self.Portfolio[self.p].IsLong and not self.Portfolio[self.p].IsShort:
if z_idx > entrythreshold:
self.MarketOrder(self.p, -wtp*C/p_px)
self.MarketOrder(self.q, wtq*C/q_px)
elif z_idx < -entrythreshold:
self.MarketOrder(self.p, wtp*C/p_px)
self.MarketOrder(self.q, -wtq*C/q_px)
elif self.Portfolio[self.p].IsLong:
if z_idx >= 0:
self.Liquidate(self.p)
self.Liquidate(self.q)
elif self.Portfolio[self.p].IsShort:
if z_idx <= 0:
self.Liquidate(self.p)
self.Liquidate(self.q)
#region imports
from AlgorithmImports import *
#endregion
from AlgorithmImports import *
import numpy as np
import statsmodels.api as model
from statsmodels.tsa.stattools import adfuller
from datetime import timedelta
import math
# This code implements the pairs trading algorithm detailed in the lecture notes. Credit to the authors.
# The method of calculating the ADF is modified from code on slide 162 of lecture 8. Credit to the authors.
class Math585HW(QCAlgorithm):
def Initialize(self):
self.p = "GOOG"
self.q = "MS"
self.res = Resolution.Hour
self.AddEquity(self.p, self.res)
self.AddEquity(self.q, self.res)
self.SetStartDate(2019, 6, 1)
self.SetEndDate(2020, 1, 1)
self.SetCash(100000)
self.SetWarmup(150)
# Amount of capitol I want to risk
# The remainder of my cash will remain cash.
self.C = 10000
self.lookback_window = 150 #days
self.Debug("In Init")
#TOTO change
self.start = True
self.quit = False
def OnMarginCallWarning(self):
self.Debug("Liquidating due to Margin Call concern")
self.quit = True
self.SetHoldings(self.p, 0)
self.SetHoldings(self.q, 0)
def OnData(self, data):
self.Log("In OnData")
if self.IsWarmingUp:
self.Log("Warming Up")
return
if self.start:
#Gauarantee a starting purchase to make sure I have at least one trade no matter what the market does.
self.Debug("Starting")
self.start = False
self.SetHoldings(self.p, 0.05)
self.SetHoldings(self.q, -0.05)
return
if self.quit:
return
# Safety Check against Margin call
if self.Portfolio.MarginRemaining < 0.1 * self.Portfolio.TotalPortfolioValue:
self.Debug("Liquidating due to Margin Call concern")
self.quit = True
self.SetHoldings(self.p, 0)
self.SetHoldings(self.q, 0)
try:
p_px = self.Securities[self.p].Price
q_px = self.Securities[self.q].Price
hist_p = self.History([self.p], timedelta(days=self.lookback_window), resolution=self.res)["close"].tolist()
hist_q = self.History([self.q], timedelta(days=self.lookback_window), resolution=self.res)["close"].tolist()
#self.Debug(hist_p)
_, beta, alpha, z_indices, sigma = self.calc_adf(hist_p, hist_q)
residual = 0
z_idx = z_indices[-1]
entrythreshold = 1
exitthreshold = 0.7
wtp = 1 / (1 + beta)
wtq = beta / (1 + beta)
self.Debug(f"LT: Z_index: {z_idx}")
#if not self.Portfolio[self.p].Invested:
if not self.Portfolio[self.p].IsLong and not self.Portfolio[self.p].IsShort:
if z_idx > entrythreshold:
self.Debug("Opening Short Position")
self.MarketOrder(self.p, math.floor(-wtp*self.C/p_px))
self.MarketOrder(self.q, math.ceil(wtq*self.C/q_px))
elif z_idx < -entrythreshold:
self.Debug("Opening Long Position")
self.MarketOrder(self.p, math.ceil(wtp*self.C/p_px))
self.MarketOrder(self.q, math.floor(-wtq*self.C/q_px))
elif self.Portfolio[self.p].IsLong:
if z_idx > -exitthreshold:
self.Debug("Closing Position")
self.SetHoldings(self.p, 0)
self.SetHoldings(self.q, 0)
elif self.Portfolio[self.p].IsShort:
if z_idx < exitthreshold:
self.Debug("Closing Position")
self.SetHoldings(self.p, 0)
self.SetHoldings(self.q, 0)
except Error as e:
self.Debug("Caught an error; prevented code from crashing")
return
def calc_adf(self, close1, close2):
# This method of calculating the ADF is modified from code on slide 162 of lecture 8. Credit to the authors.
logclose1 = np.log(close1)
logclose2 = np.log(close2)
ind = model.add_constant(logclose1)
dep = logclose2
leastsquare = model.OLS(dep, ind)
prod = leastsquare.fit()
slope = prod.params[1]
intercept = prod.params[0]
residuals = prod.resid
standard_dev_residuals = np.sqrt(prod.mse_resid)
# Residuals are mean zero. Therefore the z-index is simply observation over std dev.
z_index = residuals / standard_dev_residuals
#adf = adfuller(residuals)
return None, slope, intercept, z_index, standard_dev_residuals#region imports
from AlgorithmImports import *
#endregion
from AlgorithmImports import *
class Math585HW(QCAlgorithm):
def Initialize(self):
self.AddEquity("SPY")
self.AddEquity("MS")
self.AddEquity("XOM")
self.SetStartDate(2017, 1, 1)
self.SetEndDate(2018, 1, 1)
self.SetCash(1000000)
self.SetWarmup(30)
self.first_day = True
self.xom_wt = 0.5
self.spy_wt = 0
self.ms_wt = 0.5
def OnData(self, data):
if self.IsWarmingUp:
return
if self.first_day:
self.first_day = False
self.SetHoldings("MS", self.ms_wt)
self.SetHoldings("XOM", self.xom_wt)
self.SetHoldings("SPY", self.spy_wt)
#region imports
from AlgorithmImports import *
#endregion
from AlgorithmImports import *
import numpy as np
"""
This code was built starting from code provided by Lecture 12 on Sakai. Credit to the authors.
This code also follows the approach of and some code from https://www.quantconnect.com/learning/articles/introduction-to-options/quantconnect-options-api
Credit to the authors.
"""
class Math585HW(QCAlgorithm):
def Initialize(self) -> None:
self.option_pos_size = 3600
self.SetStartDate(2021, 11, 1)
self.SetEndDate(2021, 11, 19)
self.SetCash(2000000)
self.SetWarmUp(25)
# Requesting data
self.underlying = self.AddEquity("QQQ").Symbol
option = self.AddOption("QQQ")
self.option_symbol = option.Symbol
# Set our strike/expiry filter for this option chain
option.SetFilter(0, 5, 0, 90)
self.contract = None
#Set Options Model: Equity Options are American Style
# As such, need to use CrankNicolsonFD or other models.
#BlackScholes Model is only for European Options
option.PriceModel = OptionPriceModels.CrankNicolsonFD()
self.day1 = True
self.historic = []
def OnData(self, slice: Slice) -> None:
self.Debug(f"in OnData")
if self.IsWarmingUp: #Wait until warming up is done
self.historic.append(self.Securities["QQQ"].Price)
return
# On Day 1, we need to identify:
# - the ATM contract
# - whether we are longing or shorting this contract
if self.day1 == True:
self.day1=False
# Calculate volatility over the past 25 days:
self.historic = self.historic[-25:]
self.Debug(f"Historic: {self.historic}")
# Calc log returns
logret = []
for i in range(1, len(self.historic)):
logret.append(self.historic[i] / self.historic[i-1])
hist_vol = np.std(logret)
self.Debug(f"Historic Volatility: {hist_vol}")
# Find the ATM option:
todayprice = self.Securities["QQQ"].Price
chain = slice.OptionChains.get(self.option_symbol)
if chain:
self.Debug("In Chain")
# Select call contracts
expiry = datetime(2021, 11, 19)
correct_right = [contract for contract in chain if contract.Right == OptionRight.Call]
correct_expiry = [contract for contract in correct_right if contract.Expiry == expiry]
good_strike = [contract for contract in correct_expiry if contract.Strike > todayprice]
# Search through available options to find the one with the lowest strike price in range:
best_option = good_strike[0]
best_strike = good_strike[0].Strike
for i in range(len(good_strike)):
if good_strike[i].Strike < best_strike:
best_option = good_strike[i]
best_strike = good_strike[i].Strike
self.atm = best_option
self.Debug(f"There are {len(correct_expiry)} options that match!")
self.Debug(f"Best option is {self.atm}")
ivol = self.atm.ImpliedVolatility
self.Debug(f"Implied Volatility is: {ivol}")
# Long or short option?
if ivol < hist_vol:
self.long_option = -1
else:
self.long_option = 1
self.Debug(f"Delta: {self.atm.Greeks.Delta}")
# Order the option in the correct quantity and direction
self.MarketOrder(self.atm.Symbol, self.option_pos_size * self.long_option)
self.last_delta = self.atm.Greeks.Delta
else:
# Not Day 1 work
# Code heavily based upon Pranay Jain (TA)'s Ed Post
# Credit to the source.
new_delta = self.atm.Greeks.Delta
hedge = self.option_pos_size * self.long_option * 100 * (new_delta - self.last_delta)
self.MarketOrder("QQQ", hedge)
self.last_delta = new_delta
#region imports
from AlgorithmImports import *
#endregion
from AlgorithmImports import *
"""
This code was built starting from code provided by Lecture 12 on Sakai. Credit to the authors.
This code also follows the approach of and some code from https://www.quantconnect.com/learning/articles/introduction-to-options/quantconnect-options-api
Credit to the authors.
"""
class Math585HW(QCAlgorithm):
def Initialize(self) -> None:
self.SetStartDate(2022, 10, 14)
self.SetEndDate(2023, 1, 1)
self.SetCash(100000)
self.SetWarmUp(20)
# Requesting data
self.underlying = self.AddEquity("AAPL").Symbol
option = self.AddOption("AAPL")
self.option_symbol = option.Symbol
# Set our strike/expiry filter for this option chain
option.SetFilter(-20, 4, 15, 30)
self.contract = None
#Set Options Model: Equity Options are American Style
# As such, need to use CrankNicolsonFD or other models.
#BlackScholes Model is only for European Options
option.PriceModel = OptionPriceModels.CrankNicolsonFD()
def OnData(self, slice: Slice) -> None:
self.Debug(f"in OnData")
if self.IsWarmingUp: #Wait until warming up is done
return
if self.Portfolio[self.underlying].Invested:
self.Liquidate(self.underlying)
if self.contract is not None and self.Portfolio[self.contract.Symbol].Invested:
return
chain = slice.OptionChains.get(self.option_symbol)
if chain:
self.Debug("In Chain")
# Select call contracts
correct_right = [contract for contract in chain if contract.Right == OptionRight.Call]
if len(correct_right) == 0:
return
self.Debug(f"Length Nonzero {correct_right[0].Strike}")
strike = 145
# expiry = "2022-11-18"
expiry = datetime(2022, 11, 18)
"""
# Select the call contracts with the furthest expiration
furthest_expiry = sorted(correct_right, key = lambda x: x.Expiry, reverse=True)[0].Expiry
furthest_expiry_calls = [contract for contract in calls if contract.Expiry == furthest_expiry]
# From the remaining contracts, select the one with its strike closest to the underlying price
self.contract = sorted(furthest_expiry_calls, key = lambda x: abs(chain.Underlying.Price - x.Strike))[0]
"""
strikes = []
for x in correct_right:
if x.Strike not in strikes:
strikes.append(x.Strike)
if x.Strike == strike and x.Expiry == expiry:
self.contract = x
break
if self.contract is None:
self.Debug("No contract me the requirements")
return
ivol = self.contract.ImpliedVolatility
self.Debug("+++++++++++++++++")
self.Debug("Implied Vol: " + str(ivol))
self.Debug("Ask Price: " + str(self.contract.AskPrice))
self.Debug("+++++++++++++++++")
self.Debug(strikes)
self.MarketOrder(self.contract.Symbol, 1)#region imports
from AlgorithmImports import *
#endregion
from AlgorithmImports import *
"""
This code was built starting from code provided by Lecture 12 on Sakai. Credit to the authors.
This code also follows the approach of and some code from https://www.quantconnect.com/learning/articles/introduction-to-options/quantconnect-options-api
Credit to the authors.
"""
class Math585HW(QCAlgorithm):
def Initialize(self) -> None:
self.SetStartDate(2022, 10, 14)
self.SetEndDate(2023, 1, 1)
self.SetCash(100000)
self.SetWarmUp(20)
# Requesting data
self.underlying = self.AddEquity("AAPL").Symbol
option = self.AddOption("AAPL")
self.option_symbol = option.Symbol
# Set our strike/expiry filter for this option chain
option.SetFilter(-30, -10, 15, 30)
self.contract = None
#Set Options Model: Equity Options are American Style
# As such, need to use CrankNicolsonFD or other models.
#BlackScholes Model is only for European Options
option.PriceModel = OptionPriceModels.CrankNicolsonFD()
def OnData(self, slice: Slice) -> None:
self.Debug(f"in OnData")
if self.IsWarmingUp: #Wait until warming up is done
return
if self.Portfolio[self.underlying].Invested:
self.Liquidate(self.underlying)
if self.contract is not None and self.Portfolio[self.contract.Symbol].Invested:
return
chain = slice.OptionChains.get(self.option_symbol)
if chain:
self.Debug("In Chain")
# Select call contracts
correct_right = [contract for contract in chain if contract.Right == OptionRight.Put]
if len(correct_right) == 0:
return
self.Debug(f"Length Nonzero {correct_right[0].Strike}")
strike = 130
# expiry = "2022-11-18"
expiry = datetime(2022, 11, 18)
"""
# Select the call contracts with the furthest expiration
furthest_expiry = sorted(correct_right, key = lambda x: x.Expiry, reverse=True)[0].Expiry
furthest_expiry_calls = [contract for contract in calls if contract.Expiry == furthest_expiry]
# From the remaining contracts, select the one with its strike closest to the underlying price
self.contract = sorted(furthest_expiry_calls, key = lambda x: abs(chain.Underlying.Price - x.Strike))[0]
"""
strikes = []
for x in correct_right:
if x.Strike not in strikes:
strikes.append(x.Strike)
if x.Strike == strike and x.Expiry == expiry:
self.contract = x
break
if self.contract is None:
self.Debug("No contract me the requirements")
return
ivol = self.contract.ImpliedVolatility
self.Debug("+++++++++++++++++")
self.Debug("Implied Vol: " + str(ivol))
self.Debug("Ask Price: " + str(self.contract.AskPrice))
self.Debug("+++++++++++++++++")
self.Debug(strikes)
self.MarketOrder(self.contract.Symbol, 1)#region imports
from AlgorithmImports import *
#endregion
from AlgorithmImports import *
"""
This code was built starting from code provided by Lecture 12 on Sakai. Credit to the authors.
This code also follows the approach of and some code from https://www.quantconnect.com/learning/articles/introduction-to-options/quantconnect-options-api
Credit to the authors.
"""
class Math585HW(QCAlgorithm):
def Initialize(self) -> None:
self.SetStartDate(2022, 10, 14)
self.SetEndDate(2023, 1, 1)
self.SetCash(100000)
self.SetWarmUp(20)
# Requesting data
self.underlying = self.AddEquity("AAPL").Symbol
option = self.AddOption("AAPL")
self.option_symbol = option.Symbol
# Set our strike/expiry filter for this option chain
option.SetFilter(-30, -10, 30, 90)
self.contract = None
#Set Options Model: Equity Options are American Style
# As such, need to use CrankNicolsonFD or other models.
#BlackScholes Model is only for European Options
option.PriceModel = OptionPriceModels.CrankNicolsonFD()
def OnData(self, slice: Slice) -> None:
self.Debug(f"in OnData")
if self.IsWarmingUp: #Wait until warming up is done
return
if self.Portfolio[self.underlying].Invested:
self.Liquidate(self.underlying)
if self.contract is not None and self.Portfolio[self.contract.Symbol].Invested:
return
chain = slice.OptionChains.get(self.option_symbol)
if chain:
self.Debug("In Chain")
# Select call contracts
correct_right = [contract for contract in chain if contract.Right == OptionRight.Put]
if len(correct_right) == 0:
return
self.Debug(f"Length Nonzero {correct_right[0].Expiry}")
# expiry = "2022-11-18"
expiry = datetime(2022, 12, 16)
"""
# Select the call contracts with the furthest expiration
furthest_expiry = sorted(correct_right, key = lambda x: x.Expiry, reverse=True)[0].Expiry
furthest_expiry_calls = [contract for contract in calls if contract.Expiry == furthest_expiry]
# From the remaining contracts, select the one with its strike closest to the underlying price
self.contract = sorted(furthest_expiry_calls, key = lambda x: abs(chain.Underlying.Price - x.Strike))[0]
"""
strikes = []
for x in correct_right:
#self.Debug(f"Expiry: {x.Expiry} Strike: {x.Strike}")
if x.Strike == 125 and x.Expiry == expiry:
self.contract125 = x
break
for x in correct_right:
#self.Debug(f"Expiry: {x.Expiry} Strike: {x.Strike}")
if x.Strike == 130 and x.Expiry == expiry:
self.contract130 = x
break
self.Debug("+++++++++++++++++")
self.Debug("125 - Implied Vol: " + str(self.contract125.ImpliedVolatility))
self.Debug("125 - Ask Price: " + str(self.contract125.AskPrice))
self.Debug("130 - Implied Vol: " + str(self.contract130.ImpliedVolatility))
self.Debug("130 - Ask Price: " + str(self.contract130.AskPrice))
self.Debug("+++++++++++++++++")
self.Debug(strikes)
self.MarketOrder(self.contract.Symbol, 1)
# To prevent errors
self.contract = self.contract130# region imports
from AlgorithmImports import *
# endregion
class Math585HW(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 1) # Set Start Date
self.SetEndDate(2021, 1, 1)
self.SetCash(1000000) # Set Strategy Cash
self.AddEquity("AMZN", Resolution.Daily)
self.AddEquity("GOOG", Resolution.Daily)
self.GOOG_POS = 6000
self.AMZN_POS = -8000
self.init_run = True
def OnData(self, data: Slice):
# On first call to OnData, do the following:
if self.init_run:
self.init_run = False
print("Placing Market ORder")
self.MarketOrder("GOOG", self.GOOG_POS)
self.MarketOrder("AMZN", self.AMZN_POS)# region imports
from AlgorithmImports import *
# endregion
class Math585HW(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 1) # Set Start Date
self.SetEndDate(2021, 1, 1)
self.SetCash(1000000) # Set Strategy Cash
self.AddEquity("AMZN", Resolution.Daily)
self.AddEquity("GOOG", Resolution.Daily)
self.GOOG_POS = 6000
self.AMZN_POS = -8000
self.init_run = True
def OnData(self, data: Slice):
# On first call to OnData, do the following:
if self.init_run:
self.init_run = False
print("Placing Market ORder")
self.MarketOrder("GOOG", self.GOOG_POS)
self.MarketOrder("AMZN", self.AMZN_POS)
# On subsequent days, do the following:
else:
if self.Portfolio.TotalUnrealizedProfit < -100000:
self.MarketOrder("GOOG", -self.GOOG_POS)
self.MarketOrder("AMZN", -self.AMZN_POS)# region imports
from AlgorithmImports import *
# endregion
class Math585HW(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 1) # Set Start Date
self.SetEndDate(2021, 1, 1)
self.SetCash(1000000) # Set Strategy Cash
self.AddEquity("AMZN", Resolution.Daily)
self.AddEquity("GOOG", Resolution.Daily)
self.GOOG_POS = 6000
self.AMZN_POS = -8000
self.init_run = True
def OnData(self, data: Slice):
# On first call to OnData, do the following:
if self.init_run:
self.init_run = False
print("Placing Limit Order")
self.LimitOrder("GOOG", self.GOOG_POS, self.Securities["GOOG"].Price * 0.95)
self.LimitOrder("AMZN", self.AMZN_POS, self.Securities["AMZN"].Price * 1.05)
# On subsequent days, do the following:
else:
if self.Portfolio.TotalUnrealizedProfit < -100000:
self.MarketOrder("GOOG", -self.GOOG_POS)
self.MarketOrder("AMZN", -self.AMZN_POS)# region imports
from AlgorithmImports import *
# endregion
class Math585HW(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 1) # Set Start Date
self.SetEndDate(2021, 1, 1)
self.SetCash(1000000) # Set Strategy Cash
self.AddEquity("AMZN", Resolution.Daily)
self.AddEquity("GOOG", Resolution.Daily)
self.GOOG_POS = 4221
self.AMZN_POS = -(8/6) * self.GOOG_POS
self.init_run = True
def OnData(self, data: Slice):
# On first call to OnData, do the following:
if self.init_run:
self.init_run = False
print("Placing Limit Order")
self.Debug(f"Longing {self.GOOG_POS} google shares and Shorting {self.AMZN_POS} amazon shares")
self.MarketOrder("GOOG", self.GOOG_POS)
self.MarketOrder("AMZN", self.AMZN_POS)
def OnMarginCall(self, r):
# Make sure that the user cannot miss the fact that a margin call occured.
# If a margin call occurs, an assertion error will pop up.
assert False #region imports
from AlgorithmImports import *
#endregion
from AlgorithmImports import *
import numpy as np
"""
This code was built starting from code provided by Lecture 12 on Sakai. Credit to the authors.
This code also follows the approach of and some code from https://www.quantconnect.com/learning/articles/introduction-to-options/quantconnect-options-api
Credit to the authors.
"""
class Math585HW(QCAlgorithm):
def Initialize(self) -> None:
self.option_pos_size = 3600
self.SetStartDate(2021, 11, 1)
self.SetEndDate(2021, 11, 19)
self.SetCash(2000000)
self.SetWarmUp(25)
# Requesting data
self.underlying = self.AddEquity("QQQ").Symbol
option = self.AddOption("QQQ")
self.option_symbol = option.Symbol
# Set our strike/expiry filter for this option chain
option.SetFilter(0, 5, 55, 90)
self.contract = None
#Set Options Model: Equity Options are American Style
# As such, need to use CrankNicolsonFD or other models.
#BlackScholes Model is only for European Options
option.PriceModel = OptionPriceModels.CrankNicolsonFD()
self.day1 = True
self.historic = []
def OnData(self, slice: Slice) -> None:
self.Debug(f"in OnData")
if self.IsWarmingUp: #Wait until warming up is done
return
# On Day 1, we need to identify:
# - the ATM contract
# - whether we are longing or shorting this contract
if self.day1 == True:
self.day1=False
todayprice = self.Securities["QQQ"].Price
#expiry = datetime(2021, 11, 19)
#self.Debug(f"QC Now: {self.Time}")
expiry = self.Time + timedelta(days=60)
self.search_contract(slice, self.option_symbol, todayprice, expiry, call=True)
def search_contract(self, slice, optsym, strike, expiry, call=True):
chain = slice.OptionChains.get(self.option_symbol)
if chain:
self.Debug("In Chain")
# Select call contracts
if call==True:
right = OptionRight.Call
else:
right = OptionRight.Put
correct_right = [contract for contract in chain if contract.Right == right]
correct_expiry = [contract for contract in correct_right if contract.Expiry >= expiry]
#good_strike = [contract for contract in correct_expiry if abs(contract.Strike - strike) < 10]
# No longer makes sense to do pre-filtering on strike price in this paradigm
good_strike = correct_expiry
# Search through available options to find the one with the lowest strike price in range:
best_option = good_strike[0]
best_strike = good_strike[0].Strike
for i in range(len(good_strike)):
if abs(good_strike[i].Strike - strike) < abs(best_strike - strike):
best_option = good_strike[i]
best_strike = good_strike[i].Strike
self.Debug(f"There are {len(good_strike)} options that match!")
self.Debug(f"Best option is {best_option}")
return best_option #region imports
from AlgorithmImports import *
#endregion
from AlgorithmImports import *
import numpy as np
from datetime import date, timedelta, datetime
"""
This code was built starting from code provided by Lecture 12 on Sakai. Credit to the authors.
This code also follows the approach of and some code from https://www.quantconnect.com/learning/articles/introduction-to-options/quantconnect-options-api
Credit to the authors.
"""
class Math585HW(QCAlgorithm):
def Initialize(self) -> None:
self.SetStartDate(2021, 11, 1)
self.SetEndDate(2021, 11, 19)
self.SetCash(2000000)
self.SetWarmUp(25)
# Requesting data
self.underlying = "QQQ"
tickers = [self.underlying]
self.options = {}
for tick in tickers:
self.sym = self.AddEquity(tick).Symbol
opt = self.AddOption(tick)
opt.SetFilter(0, 5, 0, 90)
self.options[tick] = opt.Symbol
# If we are putting the stock, we want a wide range of strikes
# Only consider strikes that are far out.
self.day1 = True
self.OPT_LOOKBEHIND = 60
self.OPT_LOOKAHEAD = 45
self.MC_N_SIMS = 50
self.OPT_QUANTILE = 0.1
def OnData(self, slice: Slice) -> None:
self.Debug(f"in OnData")
if self.IsWarmingUp: # Wait until warming up is done
return
self.Debug("No longer warming up")
def insurance(self, slice, l_sym, s_sym):
l_ret = self.get_returns(l_sym, 30)
px = self.Securities[self.sym].Price
mc = monte_carlo(px, ret, 40, 50)
worst = mc[int(np.floor(len(mc) * 0.1))]
best = mc[int(np.floor(len(mc) * 0.9))]
self.Debug(worst)
self.Debug(best)
# Suppose we are longing. We want put protection against this security.
#now = date.today()
#now = datetime.now()
#expiry = now + timedelta(days=8)
expiry = self.Time + timedelta(days=8)
px = self.Securities["QQQ"].Price
cn = self.get_option_contract_number(slice, self.options[self.underlying], expiry, px, call=True)
self.Debug(cn)
def search_contract(self, slice, optsym, strike, expiry, call=True):
chain = slice.OptionChains.get(self.option_symbol)
if chain:
self.Debug("In Chain")
# Select call contracts
if call==True:
right = OptionRight.Call
else:
right = OptionRight.Put
correct_right = [contract for contract in chain if contract.Right == right]
correct_expiry = [contract for contract in correct_right if contract.Expiry >= expiry]
#good_strike = [contract for contract in correct_expiry if abs(contract.Strike - strike) < 10]
# No longer makes sense to do pre-filtering on strike price in this paradigm
good_strike = correct_expiry
# Search through available options to find the one with the lowest strike price in range:
best_option = good_strike[0]
best_strike = good_strike[0].Strike
for i in range(len(good_strike)):
if abs(good_strike[i].Strike - strike) < abs(best_strike - strike):
best_option = good_strike[i]
best_strike = good_strike[i].Strike
self.Debug(f"There are {len(good_strike)} options that match!")
self.Debug(f"Best option is {best_option}")
return best_option
def get_returns(self, symbol, lookback):
df = self.History(symbol, lookback, Resolution.Daily)
closes = df.close.tolist()
closes = np.array(closes)
returns = closes[1:] / closes[:-1]
return returns
def monte_carlo(current_px, returns, lookforward, num_sims):
results = [None] * num_sims
for i in range(len(results)):
res = sim(current_px, returns, lookforward)
results[i] = res
results.sort()
return results
def sim(current_px, returns, lookforward):
u = np.random.randint(0, high=len(returns), size=lookforward)
simret = [0] * len(u)
for i in range(len(simret)):
simret[i] = returns[u[i]]
totalsimret = np.prod(simret)
return current_px * totalsimret
#region imports
from AlgorithmImports import *
#endregion
from datetime import timedelta, datetime
import math
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint, adfuller
import numpy as np
import pandas as pd
import scipy.optimize as so
import scipy.integrate as si
import scipy.stats as ss
from math import log,exp,sqrt
import matplotlib.pyplot as plt
from scipy import sparse
from scipy.sparse.linalg import spsolve
from mpl_toolkits import mplot3d
from matplotlib import cm
import scipy.special as scsp
from scipy.integrate import quad
from scipy.interpolate import RegularGridInterpolator
class PairsTradingAlgorithm(QCAlgorithm):
def Initialize(self):
#if True:
self.pairs_list = [
['JNJ', 'ABBV'],
['DUK', 'AEP'],
['NKE', 'SBUX'],
['SPGI', 'MA'],
['DLR', 'CCI'],
['PM','PG'],
['TMO', 'UNH'],
['COP', 'EOG'],
['COP', 'EOG']]
# else:
#self.pairs_list = [
#['COP', 'EOG'],['ADBE', 'MSFT'],['TSM', 'ORCL'], \
#['CAT', 'BA']]
#self.SetStartDate(2017, 1, 1)
#self.SetEndDate(2021,1,1)
self.SetStartDate(2023, 4, 18)
self.SetEndDate(2023,4,24)
self.capital = 1000000
self.SetCash(self.capital)
self.SetWarmup(252)
self.X = None
self.num_MC = 1000
self.iteration = 3
self.enter = 2 # Set the enter threshold
self.risk_level = 2
self.exit = 0 # Set the exit threshold
self.lookback = 100 # Set the loockback period 90 days
self.dt = 1/self.lookback
self.wt = 1/len(self.pairs_list)
#new code below for list of pairs
self.z = 0
self.symbols_list =[]
for ticker1, ticker2 in self.pairs_list:
u1 = self.AddEquity(ticker1, Resolution.Daily).Symbol
u2 = self.AddEquity(ticker2, Resolution.Daily).Symbol
self.symbols_list.append([self.Symbol(ticker1),self.Symbol(ticker2)])
def compute_log_likelihood(self,params,*args):
theta, mu, sigma = params
X,dt = args
n= len(X)
sigma_tilde_squared = (sigma ** 2) * (1- exp(-2 * mu * dt))/(2 * mu)
Sum = 0
for i in range(1,len(X)):
Sum = Sum + (X[i] - X[i -1] * exp(-mu * dt) - theta*(1-exp(-mu * dt)))**2
Sum = -Sum / (2*n*sigma_tilde_squared)
loglikelihood = -0.5 * log(2 * math.pi) - log(sqrt(sigma_tilde_squared)) + Sum
return -loglikelihood
def MLE(self,X,dt,tol = 1e-10):
bounds = ((None,None),(1e-5,None),(1e-5,None)) # bondary for OU parameters
theta_init = X.mean()
initial_guess = (theta_init,1,1)
result = so.minimize(self.compute_log_likelihood,initial_guess,args = (X,dt),bounds = bounds)
theta,mu,sigma = result.x
return theta,mu,sigma
def OU_process_generator(self,mu,theta,sigma,N,iteration):
self.X = np.zeros((iteration,N))
p_5 = 0
p_50 = 0
p_95 = 0
for j in range(iteration):
for i in range(1,N):
W = ss.norm.rvs( loc=0, scale=1, size = 1)
self.X[j,i] = self.X[j,i-1] + theta*(mu - self.X[j,i-1]) * self.dt + sigma * np.sqrt(self.dt) * W
for i in range(iteration):
p_5 = p_5 + np.percentile(self.X[i],5)
p_50 = p_50 + np.percentile(self.X[i],50)
p_95 = p_95 + np.percentile(self.X[i],95)
return [p_5/iteration,p_50/iteration,p_95/iteration]
def stats(self, symbols):
#Use Statsmodels package to compute linear regression and ADF statistics
self.df = self.History(symbols, self.lookback)
self.dg = self.df["close"].unstack(level=0)
#self.Debug(self.dg)
ticker1= str(symbols[0])
ticker2= str(symbols[1])
Y = self.dg[ticker1].apply(lambda x: math.log(x))
X = self.dg[ticker2].apply(lambda x: math.log(x))
#self.Debug(f"Now regressing {ticker1} {ticker2}")
X = sm.add_constant(X)
model = sm.OLS(Y,X)
results = model.fit()
sigma = math.sqrt(results.mse_resid) # standard deviation of the residual
slope = results.params[1]
intercept = results.params[0]
res = results.resid #regression residual mean of res =0 by definition
zscore = res/sigma
adf = adfuller (res)
return [adf, zscore, slope, res]
def OnData(self, data):
if self.IsWarmingUp:
return
for pairs in self.pairs_list:
stats = self.stats([self.Symbol(pairs[0]),self.Symbol(pairs[1])])
self.beta = stats[2]
self.z= stats[1][-1]
res = stats[3]
#self.Debug(stats[1].values)
params = self.MLE(stats[1].values,self.dt)
#self.Debug(params)
threshold = self.OU_process_generator(params[0],params[1],params[2],self.num_MC,self.iteration)
self.Debug(threshold)
#self.Debug(self.wt)
#self.Debug( 1 * self.wt/(1+self.beta))
#self.Debug( 1 * -self.beta * self.wt/(1+self.beta))
#self.Debug(self.beta)
#self.Debug(stats[0])
#self.Debug(self.Portfolio[self.Symbol(pairs[0])].HoldingsValue)
#self.Debug(self.Portfolio[self.Symbol(pairs[1])].HoldingsValue)
self.Debug('z-score: '+ str(self.z))
if 0.5 <self.beta < 5:
if (not self.Portfolio[self.Symbol(pairs[0])].Invested) and self.z > threshold[2]:
self.SetHoldings(pairs[0], - 1 * self.wt/(1+self.beta))
self.SetHoldings(pairs[1], self.beta * self.wt/(1+self.beta))
if (not self.Portfolio[self.Symbol(pairs[0])].Invested) and self.z < threshold[0]:
self.SetHoldings(pairs[0], 1 * self.wt/(1+self.beta))
self.SetHoldings(pairs[1], -self.beta * self.wt/(1+self.beta))
if (self.Portfolio[self.Symbol(pairs[0])].IsShort and self.z < (threshold[2]+threshold[1])/4) or (self.Portfolio[self.Symbol(pairs[1])].IsShort and self.z > (threshold[1]+threshold[0])/4) :
self.Liquidate(pairs[0])
self.Liquidate(pairs[1]) #region imports
from AlgorithmImports import *
#endregion
from AlgorithmImports import *
import numpy as np
"""
This code was built starting from code provided by Lecture 12 on Sakai. Credit to the authors.
This code also follows the approach of and some code from https://www.quantconnect.com/learning/articles/introduction-to-options/quantconnect-options-api
Credit to the authors.
"""
class Math585HW(QCAlgorithm):
def Initialize(self) -> None:
self.option_pos_size = 3600
self.SetStartDate(2021, 11, 1)
self.SetEndDate(2021, 11, 19)
self.SetCash(2000000)
self.SetWarmUp(25)
# Requesting data
self.underlying = self.AddEquity("QQQ").Symbol
option = self.AddOption("QQQ")
self.option_symbol = option.Symbol
# Set our strike/expiry filter for this option chain
option.SetFilter(0, 5, 0, 90)
self.contract = None
#Set Options Model: Equity Options are American Style
# As such, need to use CrankNicolsonFD or other models.
#BlackScholes Model is only for European Options
option.PriceModel = OptionPriceModels.CrankNicolsonFD()
self.day1 = True
self.historic = []
def OnData(self, slice: Slice) -> None:
self.Debug(f"in OnData")
if self.IsWarmingUp: #Wait until warming up is done
self.historic.append(self.Securities["QQQ"].Price)
return
# On Day 1, we need to identify:
# - the ATM contract
# - whether we are longing or shorting this contract
if self.day1 == True:
self.day1=False
# Calculate volatility over the past 25 days:
self.historic = self.historic[-25:]
self.Debug(f"Historic: {self.historic}")
# Calc log returns
logret = []
for i in range(1, len(self.historic)):
logret.append(self.historic[i] / self.historic[i-1])
hist_vol = np.std(logret)
self.Debug(f"Historic Volatility: {hist_vol}")
# Find the ATM option:
todayprice = self.Securities["QQQ"].Price
chain = slice.OptionChains.get(self.option_symbol)
if chain:
self.Debug("In Chain")
# Select call contracts
expiry = datetime(2021, 11, 19)
correct_right = [contract for contract in chain if contract.Right == OptionRight.Call]
correct_expiry = [contract for contract in correct_right if contract.Expiry == expiry]
good_strike = [contract for contract in correct_expiry if contract.Strike > todayprice]
# Search through available options to find the one with the lowest strike price in range:
best_option = good_strike[0]
best_strike = good_strike[0].Strike
for i in range(len(good_strike)):
if good_strike[i].Strike < best_strike:
best_option = good_strike[i]
best_strike = good_strike[i].Strike
self.atm = best_option
self.Debug(f"There are {len(correct_expiry)} options that match!")
self.Debug(f"Best option is {self.atm}")
ivol = self.atm.ImpliedVolatility
self.Debug(f"Implied Volatility is: {ivol}")
# Long or short option?
if ivol < hist_vol:
self.long_option = -1
else:
self.long_option = 1
self.Debug(f"Delta: {self.atm.Greeks.Delta}")
# Order the option in the correct quantity and direction
self.MarketOrder(self.atm.Symbol, self.option_pos_size * self.long_option)
self.last_delta = self.atm.Greeks.Delta
else:
# Not Day 1 work
# Code heavily based upon Pranay Jain (TA)'s Ed Post
# Credit to the source.
new_delta = self.atm.Greeks.Delta
hedge = self.option_pos_size * self.long_option * 100 * (new_delta - self.last_delta)
self.MarketOrder("QQQ", hedge)
self.last_delta = new_delta
#region imports
from AlgorithmImports import *
#endregion
import numpy as np
import pandas as pd
from datetime import timedelta, datetime
import math
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint, adfuller
'''
This code was developed by the teaching staff. It (before our modifications) is available in Sakai as teh solution to Assignment 5.
Credit to the authors.
'''
class PairsTradingAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2018, 1, 1)
self.SetEndDate(2019,1,1)
self.SetCash(1000000)
self.enter = 2 # Set the enter threshold
self.exit = 0 # Set the exit threshold
self.lookback = 90 # Set the loockback period 90 days
self.pairs =['GS','MS']
self.symbols =[]
for ticker in self.pairs:
self.AddEquity(ticker, Resolution.Daily)
self.symbols.append(self.Symbol(ticker))
self.AddUniverse(self.CoarseSelectionFilter, self.FineSelectionFunction)
#self.AddUniverse(coarse,self.FineSelectionFunction)
self.UniverseSettings.Resolution = Resolution.Daily
self.counter =0
self.dic = {}
def CoarseSelectionFilter(self, coarse):
sortedByDollarVolume = sorted(coarse, key=lambda c: c.DollarVolume, reverse=True)
filteredByPrice = [c.Symbol for c in sortedByDollarVolume if c.Price>10]
self.filter_coarse = filteredByPrice[:100]
return self.filter_coarse
def FineSelectionFunction(self, fine):
# selection based on morningstar sector code, in this example, technology code
# see document here https://www.quantconnect.com/docs/data-library/fundamentals#Fundamentals-Asset-Classification
fine1 = [x for x in fine if x.AssetClassification.MorningstarSectorCode == MorningstarSectorCode.CommunicationServices]
#ranked by the market cap
sortedByMarketCap = sorted(fine1, key=lambda c: c.MarketCap, reverse=True)
filteredFine = [i.Symbol for i in sortedByMarketCap]
self.filter_fine = filteredFine[:3]
self.Debug("Filtered: " + str(len(self.filter_fine)))
return self.filter_fine
def stats(self, symbols):
#Use Statsmodels package to compute linear regression and ADF statistics
self.df = self.History(symbols, self.lookback)
self.dg = self.df["open"].unstack(level=0)
#self.Debug(self.dg)
ticker1= str(symbols[0])
ticker2= str(symbols[1])
Y = self.dg[ticker1].apply(lambda x: math.log(x))
X = self.dg[ticker2].apply(lambda x: math.log(x))
X = sm.add_constant(X)
model = sm.OLS(Y,X)
results = model.fit()
sigma = math.sqrt(results.mse_resid) # standard deviation of the residual
slope = results.params[1]
intercept = results.params[0]
res = results.resid #regression residual mean of res =0 by definition
zscore = res/sigma
adf = adfuller (res)
return [adf, zscore, slope]
def OnData(self, data):
if self.counter == 0:
# only do this one the first day
for sym1 in self.filter_fine:
for sym2 in self.filter_fine:
self.Debug ("Pairs: " + str(sym1) + "+" + str(sym2))
if sym1 != sym2:
adf = self.stats([sym1,sym2])[0][1] # This is the adf statsitics
self.Debug("Pair " + str(sym1.Value +'-' + sym2.Value)+ "=" + str(adf))
self.dic[str(sym1) +'-' + str(sym2)] = adf
#Find the pair with the least adf value
val = max(self.dic, key = self.dic.get)
l= val.find("-")
m= len(val)
self.sym1 = self.Symbol(val[:l])
self.sym2 = self.Symbol(val[l+1:m])
'''It is important to distinguish a ticker like "AMD" and its Symbol object. ticker can be obtained
from the corresponding symbol object using .Value method. You can get a Symbol object from a ticker
using self.AddEquity(ticker,Resolution.Daily). Symbol has more information than ticker, which is just a string
'''
self.Debug("ADF " + str(self.dic))
self.IsInvested = (self.Portfolio[self.sym1].Invested) or (self.Portfolio[self.sym2].Invested)
self.ShortSpread = self.Portfolio[self.sym1].IsShort
self.LongSpread = self.Portfolio[self.sym2].IsLong
stats = self.stats([self.sym1, self.sym2])
self.beta = stats[2]
zscore= stats[1][-1]
self.wt1 = 1/(1+self.beta)
self.wt2 = self.beta/(1+self.beta)
self.pos1 = self.Portfolio[self.sym1].Quantity
self.px1 = self.Portfolio[self.sym1].Price
self.pos2 = self.Portfolio[self.sym2].Quantity
self.px2 = self.Portfolio[self.sym2].Price
self.equity =self.Portfolio.TotalPortfolioValue
if self.IsInvested:
if self.ShortSpread and zscore <= self.exit or \
self.LongSpread and zscore >= self.exit:
self.Liquidate()
else:
if zscore > self.enter:
#short spread
#rememebr SetHoldings take a Symbol as its first variable.
self.SetHoldings(self.sym1, -self.wt1)
self.SetHoldings(self.sym2, self.wt2)
if zscore < - self.enter:
#long the spread
self.SetHoldings(self.sym1, self.wt1)
self.SetHoldings(self.sym2, -self.wt2)
self.pos1 = self.Portfolio[self.sym1].Quantity
self.pos2 = self.Portfolio[self.sym2].Quantity
self.Debug("sym1 " + str(self.sym1.Value) + " /w "+ str(self.pos1) + " sym2 " +str(self.sym2.Value) + " /w "+str(self.pos2))
self.Debug("Total Account Equity: "+ str( self.equity) + "Total Marginused: "+ str( self.Portfolio.TotalMarginUsed))
self.counter =self.counter+1#region imports
from AlgorithmImports import *
#endregion
from datetime import timedelta, datetime
import math
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint, adfuller
import numpy as np
import pandas as pd
import scipy.optimize as so
import scipy.integrate as si
import scipy.stats as ss
from math import log,exp,sqrt
import matplotlib.pyplot as plt
from scipy import sparse
from scipy.sparse.linalg import spsolve
from mpl_toolkits import mplot3d
from matplotlib import cm
import scipy.special as scsp
from scipy.integrate import quad
from scipy.interpolate import RegularGridInterpolator
class PairsTradingAlgorithm(QCAlgorithm):
def Initialize(self):
#if True:
self.pairs_list = [ ['SRE', 'AEP']]
#['GOOG','TMUS']]
# else:
#self.pairs_list = [['CTVA', 'NEM'],['APD', 'LIN'],['HD', 'MCD'],['NKE', 'SBUX'],['SPGI', 'MA'],['WFC', 'BAC'], ['DLR', 'CCI'],['AMT', 'PSA'], ['KO', 'MDLZ'],['PM', 'PG'], \
#['TMO', 'UNH'],['JNJ', 'ABBV'], ['DUK', 'AEP'],['SO', 'XEL'], ['DIS', 'ATVI'],['VZ', 'T'], ['COP', 'EOG'],['SLB', 'BP'], ['ADBE', 'MSFT'],['TSM', 'ORCL'], \
#['CAT', 'BA']]
self.SetStartDate(2014, 1, 1)
self.SetEndDate(2020,1,1)
self.capital = 1000000
self.SetCash(self.capital)
self.SetWarmup(100)
self.X = None
self.num_MC = 1000
self.iteration = 3
self.enter = 2 # Set the enter threshold
self.risk_level = 2
self.exit = 0 # Set the exit threshold
self.lookback = 100 # Set the loockback period 90 days
self.dt = 1/self.lookback
self.wt = 1/len(self.pairs_list)
#new code below for list of pairs
self.z = 0
self.symbols_list =[]
for ticker1, ticker2 in self.pairs_list:
u1 = self.AddEquity(ticker1, Resolution.Daily).Symbol
u2 = self.AddEquity(ticker2, Resolution.Daily).Symbol
self.symbols_list.append([self.Symbol(ticker1),self.Symbol(ticker2)])
def compute_log_likelihood(self,params,*args):
theta, mu, sigma = params
X,dt = args
n= len(X)
sigma_tilde_squared = (sigma ** 2) * (1- exp(-2 * mu * dt))/(2 * mu)
Sum = 0
for i in range(1,len(X)):
Sum = Sum + (X[i] - X[i -1] * exp(-mu * dt) - theta*(1-exp(-mu * dt)))**2
Sum = -Sum / (2*n*sigma_tilde_squared)
loglikelihood = -0.5 * log(2 * math.pi) - log(sqrt(sigma_tilde_squared)) + Sum
return -loglikelihood
def MLE(self,X,dt,tol = 1e-10):
bounds = ((None,None),(1e-5,None),(1e-5,None)) # bondary for OU parameters
theta_init = X.mean()
initial_guess = (theta_init,1,1)
result = so.minimize(self.compute_log_likelihood,initial_guess,args = (X,dt),bounds = bounds)
theta,mu,sigma = result.x
return theta,mu,sigma
def OU_process_generator(self,mu,theta,sigma,N,iteration):
self.X = np.zeros((iteration,N))
p_5 = 0
p_50 = 0
p_95 = 0
for j in range(iteration):
for i in range(1,N):
W = ss.norm.rvs( loc=0, scale=1, size = 1)
self.X[j,i] = self.X[j,i-1] + theta*(mu - self.X[j,i-1]) * self.dt + sigma * np.sqrt(self.dt) * W
for i in range(iteration):
p_5 = p_5 + np.percentile(self.X[i],5)
p_50 = p_50 + np.percentile(self.X[i],50)
p_95 = p_95 + np.percentile(self.X[i],95)
return [p_5/iteration,p_50/iteration,p_95/iteration]
def stats(self, symbols):
#Use Statsmodels package to compute linear regression and ADF statistics
self.df = self.History(symbols, self.lookback)
self.dg = self.df["close"].unstack(level=0)
#self.Debug(self.dg)
ticker1= str(symbols[0])
ticker2= str(symbols[1])
Y = self.dg[ticker1].apply(lambda x: math.log(x))
X = self.dg[ticker2].apply(lambda x: math.log(x))
#self.Debug(f"Now regressing {ticker1} {ticker2}")
X = sm.add_constant(X)
model = sm.OLS(Y,X)
results = model.fit()
sigma = math.sqrt(results.mse_resid) # standard deviation of the residual
slope = results.params[1]
intercept = results.params[0]
res = results.resid #regression residual mean of res =0 by definition
zscore = res/sigma
adf = adfuller (res)
return [adf, zscore, slope, res]
def OnData(self, data):
if self.IsWarmingUp:
return
for pairs in self.pairs_list:
stats = self.stats([self.Symbol(pairs[0]),self.Symbol(pairs[1])])
self.beta = stats[2]
self.z= stats[1][-1]
res = stats[3]
#self.Debug(stats[1].values)
params = self.MLE(stats[1].values,self.dt)
#self.Debug(params)
threshold = self.OU_process_generator(params[0],params[1],params[2],self.num_MC,self.iteration)
self.Debug(threshold)
#self.Debug(self.wt)
#self.Debug( 1 * self.wt/(1+self.beta))
#self.Debug( 1 * -self.beta * self.wt/(1+self.beta))
#self.Debug(self.beta)
#self.Debug(stats[0])
#self.Debug(self.Portfolio[self.Symbol(pairs[0])].HoldingsValue)
#self.Debug(self.Portfolio[self.Symbol(pairs[1])].HoldingsValue)
self.Debug('z-score: '+ str(self.z))
if self.beta > 0:
if (not self.Portfolio[self.Symbol(pairs[0])].Invested) and self.z > threshold[2]:
self.SetHoldings(pairs[0], - 1 * self.wt/(1+self.beta))
self.SetHoldings(pairs[1], self.beta * self.wt/(1+self.beta))
if (not self.Portfolio[self.Symbol(pairs[0])].Invested) and self.z < threshold[0]:
self.SetHoldings(pairs[0], 1 * self.wt/(1+self.beta))
self.SetHoldings(pairs[1], -self.beta * self.wt/(1+self.beta))
if (self.Portfolio[self.Symbol(pairs[0])].IsShort and self.z < (threshold[2]+threshold[1])/4) or (self.Portfolio[self.Symbol(pairs[1])].IsShort and self.z > (threshold[1]+threshold[0])/4) :
self.Liquidate() #region imports
from AlgorithmImports import *
#endregion
from AlgorithmImports import *
import numpy as np
from datetime import date, timedelta, datetime
"""
This code was built starting from code provided by Lecture 12 on Sakai. Credit to the authors.
This code also follows the approach of and some code from https://www.quantconnect.com/learning/articles/introduction-to-options/quantconnect-options-api
Credit to the authors.
"""
class Math585HW(QCAlgorithm):
def Initialize(self) -> None:
self.SetStartDate(2021, 11, 1)
self.SetEndDate(2021, 11, 19)
self.SetCash(2000000)
self.SetWarmUp(25)
# Requesting data
tickers = ["AMZN", "MSFT"]
self.options = {}
for tick in tickers:
self.sym = self.AddEquity(tick).Symbol
opt = self.AddOption(tick)
opt.SetFilter(-100, 100, 75, 90)
self.options[tick] = opt.Symbol
self.day1 = True
self.OPT_LOOKBEHIND = 60
self.OPT_LOOKAHEAD = 45
self.MC_N_SIMS = 50
self.OPT_QUANTILE = 0.1
def OnData(self, slice: Slice) -> None:
#self.Debug(f"in OnData")
if self.IsWarmingUp: # Wait until warming up is done
return
#self.Debug("No longer warming up")
if self.day1:
self.day1 = False
# Suppose we are longing AMZN and shorting MSFT
l_ticker = "AMZN"
s_ticker = "MSFT"
l_shares = 100
s_shares = 200
# Pass in tickers for AMZN and MSFT, NOT symbol objects
l_opt, s_opt = self.insurance(slice, l_ticker, s_ticker)
self.MarketOrder(l_opt.Symbol, l_shares/100)
self.MarketOrder(s_opt.Symbol, s_shares/100)
def insurance(self, slice, l_sym, s_sym):
"""
l_sym: Ticker (not symbol) of the security we are trying to long
s_sym: Ticker (not symbol) of the secuirty we are trying to short
"""
expiry = self.Time + timedelta(days=self.OPT_LOOKAHEAD)
l_ret = self.get_returns(self.Symbol(l_sym), self.OPT_LOOKBEHIND)
l_px = self.Securities[l_sym].Price
l_mc = monte_carlo(l_px, l_ret, self.OPT_LOOKAHEAD, self.MC_N_SIMS)
low = l_mc[int(np.floor(len(l_mc) * self.OPT_QUANTILE))]
self.Debug(f"Current Px = {l_px}; Worst case px is {low}")
l_cn = self.search_contract(slice, self.options[l_sym], low, expiry, call=False)
rights = ["Call", "Put"]
self.Debug(f"Best Put Option is # {l_cn}: Strike {l_cn.Strike}; Right {rights[l_cn.Right]}; Expiry {l_cn.Expiry}")
s_ret = self.get_returns(self.Symbol(s_sym), self.OPT_LOOKBEHIND)
s_px = self.Securities[s_sym].Price
s_mc = monte_carlo(s_px, s_ret, self.OPT_LOOKAHEAD, self.MC_N_SIMS)
high = s_mc[int(np.floor(len(s_mc) * (1-self.OPT_QUANTILE)))]
self.Debug(f"Current Px = {s_px}; Worst (short) case px is {high}")
s_cn = self.search_contract(slice, self.options[s_sym], high, expiry, call=True)
self.Debug(f"Best Call Option is # {s_cn}: Strike {s_cn.Strike}; Right {rights[s_cn.Right]}; Expiry {s_cn.Expiry}")
return l_cn, s_cn
def search_contract(self, slice, optsym, strike, expiry, call=True):
chain = slice.OptionChains.get(optsym)
if chain:
self.Debug("In Chain")
# Select call contracts
if call==True:
right = OptionRight.Call
else:
right = OptionRight.Put
correct_right = [contract for contract in chain if contract.Right == right]
correct_expiry = [contract for contract in correct_right if contract.Expiry >= expiry]
#good_strike = [contract for contract in correct_expiry if abs(contract.Strike - strike) < 10]
# No longer makes sense to do pre-filtering on strike price in this paradigm
good_strike = correct_expiry
# Search through available options to find the one with the lowest strike price in range:
best_option = good_strike[0]
best_strike = good_strike[0].Strike
for i in range(len(good_strike)):
if abs(good_strike[i].Strike - strike) < abs(best_strike - strike):
best_option = good_strike[i]
best_strike = good_strike[i].Strike
self.Debug(f"There are {len(good_strike)} options that match!")
self.Debug(f"Best option is {best_option}")
return best_option
else:
self.Debug("No Chain")
def get_returns(self, symbol, lookback):
df = self.History(symbol, lookback, Resolution.Daily)
closes = df.close.tolist()
closes = np.array(closes)
returns = closes[1:] / closes[:-1]
return returns
def monte_carlo(current_px, returns, lookforward, num_sims):
results = [None] * num_sims
for i in range(len(results)):
res = sim(current_px, returns, lookforward)
results[i] = res
results.sort()
return results
def sim(current_px, returns, lookforward):
u = np.random.randint(0, high=len(returns), size=lookforward)
simret = [0] * len(u)
for i in range(len(simret)):
simret[i] = returns[u[i]]
totalsimret = np.prod(simret)
return current_px * totalsimret