| Overall Statistics |
|
Total Trades 3896 Average Win 0.19% Average Loss -0.20% Compounding Annual Return 8.907% Drawdown 21.200% Expectancy 0.424 Net Profit 531.404% Sharpe Ratio 0.684 Probabilistic Sharpe Ratio 3.027% Loss Rate 27% Win Rate 73% Profit-Loss Ratio 0.94 Alpha 0.062 Beta 0.048 Annual Standard Deviation 0.096 Annual Variance 0.009 Information Ratio -0.038 Tracking Error 0.179 Treynor Ratio 1.372 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset NDX.NDX 2S Portfolio Turnover 2.64% |
# region imports
from AlgorithmImports import *
from statistics import pstdev
# endregion
class SPYWEEKLYRSI(QCAlgorithm):
def Initialize(self):
#initialize setting
self.SetStartDate(2002, 1, 1) # Set Start Date
self.SetCash(10000000) # Set Strategy Cash
rsi_lookback = 26
ma_lookback = 52
#warm up max(rsi_lookback, ma_lookback) period
lookback = max(rsi_lookback, ma_lookback)
self.SetBenchmark('SPY')
# Data dictionary for multiple tickers to store data
self.Data = {}
# EquitySymbols = ["SPY","QQQ"]
# self.symbol = self.AddEquity("SPY", Resolution.Daily).Symbol
## Add custom data to the algo
spx = self.AddData(SPX, "SPX", Resolution.Daily)
self.Data[spx.Symbol] = SymbolData(self, spx.Symbol, rsi_lookback, ma_lookback)
dax = self.AddData(DAX, "DAX", Resolution.Daily)
self.Data[dax.Symbol] = SymbolData(self, dax.Symbol, rsi_lookback, ma_lookback)
hsi = self.AddData(HSI, "HSI", Resolution.Daily)
self.Data[hsi.Symbol] = SymbolData(self, hsi.Symbol, rsi_lookback, ma_lookback)
mxcn = self.AddData(MXCN, "MXCN", Resolution.Daily)
self.Data[mxcn.Symbol] = SymbolData(self, mxcn.Symbol, rsi_lookback, ma_lookback)
mxef = self.AddData(MXEF, "MXEF", Resolution.Daily)
self.Data[mxef.Symbol] = SymbolData(self, mxef.Symbol, rsi_lookback, ma_lookback)
ndx = self.AddData(NDX, "NDX", Resolution.Daily)
self.Data[ndx.Symbol] = SymbolData(self, ndx.Symbol, rsi_lookback, ma_lookback)
nky = self.AddData(NKY, "NKY", Resolution.Daily)
self.Data[nky.Symbol] = SymbolData(self, nky.Symbol, rsi_lookback, ma_lookback)
sx6e = self.AddData(SX6E, "SX6E", Resolution.Daily)
self.Data[sx6e.Symbol] = SymbolData(self, sx6e.Symbol, rsi_lookback, ma_lookback)
ukx = self.AddData(UKX, "UKX", Resolution.Daily)
self.Data[ukx.Symbol] = SymbolData(self, ukx.Symbol, rsi_lookback, ma_lookback)
##
# self.EnableAutomaticIndicatorWarmUp = True
self.SetWarmUp((rsi_lookback + ma_lookback) * 8)
def OnData(self, data: Slice):
# if self.first:
rsizSum = sum([v.rsiz.Current.Value for v in self.Data.values()
if (v.rsiz.IsReady) and (v.rsiz.Current.Value > 0)])
rsiCount = 0
portofolio_weight = []
if rsizSum != 0:
for k,v in self.Data.items():
if k.Value in ['DAX', 'SX6E', 'NKY', 'HSI', 'MXCN', 'MXEF', 'UKX']:
if v.rsiz.Current.Value > -0.5:
rsiCount += 1
for k,v in self.Data.items():
# self.Plot('rsiz', k.Value, v.rsiz.Current.Value)
if k.Value == 'SPX':
if v.rsiz.IsReady:
if v.rsiz.Current.Value < -0.5:
if self.Portfolio.Invested:
# if spx rsi z-score is lesser than -0.5 and invested before, liqudate the portfolio
self.Liquidate()
break
if v.rsiz.Current.Value > 0:
weighting = round(v.rsiz.Current.Value/rsizSum,2)
portofolio_weight.append(PortfolioTarget(k, weighting))
# if v.rsi.IsReady:
# self.Log(k.Value + " RSI is ready: " + str(v.rsi.Current.Value))
# if v.rsiz.IsReady:
# self.Log(k.Value + " rsiz is ready: " + str(v.rsiz.Current.Value))
# if v.rsiz.IsReady:
# self.Log(str(k) + ' rsiz: ' + str(v.rsiz.Current.Value))
# self.first = False
# set holdings
self.SetHoldings(portofolio_weight)
#
self.Log(str(self.Time) + ' rsiz sum: ' + str(rsizSum))
# Plot holdings
# totalHoldings = self.Portfolio.TotalHoldingsValue
# if self.Portfolio.Invested:
# for kvp in self.Portfolio:
# symbol = kvp.Value.Symbol.Value
# if kvp.Value.Invested:
# perHoldings = kvp.Value.AbsoluteHoldingsValue/totalHoldings
# else:
# perHoldings = 0
# self.Plot('Holdings%', symbol, perHoldings)
if self.Portfolio.Invested:
totalHoldings = self.Portfolio.TotalHoldingsValue
for kvp in self.Portfolio:
symbol = kvp.Value.Symbol.Value
perHoldings = kvp.Value.AbsoluteHoldingsValue/totalHoldings
if kvp.Value.Invested:
perHoldings = kvp.Value.AbsoluteHoldingsValue/totalHoldings
else:
perHoldings = 0
self.Plot('Holdings%', symbol, perHoldings)
class SymbolData(object):
def __init__(self, algorithm, symbol, rsilookback, malookback):
self.algorithm = algorithm
self.symbol = symbol
# Create the indicators
self.rsi = algorithm.RSI(symbol, rsilookback)
self.rsisma = IndicatorExtensions.SMA(self.rsi, malookback)
self.rsistddev = IndicatorExtensions.Of(StandardDeviation(malookback), self.rsi)
self.numerator = IndicatorExtensions.Minus(self.rsi, self.rsisma)
self.rsiz = IndicatorExtensions.Over(self.numerator, self.rsistddev)
# history = algorithm.History(symbol, rsilookback, Resolution.Daily).loc[symbol]
# for idx, row in history.iterrows():
# self.rsi.Update(idx, row.close)
# self.Debug(str(idx) + ' ' + str(row.close))
# Warm up the indicators
# algorithm.WarmUpIndicator(symbol, self.rsi)
# algorithm.WarmUpIndicator(symbol, self.rsisma)
# algorithm.WarmUpIndicator(symbol, self.rsistddev)
# algorithm.WarmUpIndicator(symbol, self.numerator)
# algorithm.WarmUpIndicator(symbol, self.rsiz)
class SPX(PythonData):
'''SPX Index Custom Data Class'''
def GetSource(self, config: SubscriptionDataConfig, date: datetime, isLiveMode: bool) -> SubscriptionDataSource:
url = "https://docs.google.com/spreadsheets/d/e/2PACX-1vSpeJIoIHEqfZiUqx3HMk28SFBAGGwrGtZLI06v7gwP77G_-HOsdzV9IkAMdBM_RwtYhvINgTM6RS9a/pub?output=csv"
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, isLiveMode: bool) -> BaseData:
if not (line.strip() and line[0].isdigit()): return None
# New SPX object
index = SPX()
index.Symbol = config.Symbol
try:
data = line.split(',')
index.Value = float(data[4])
if index.Value == 0:
return None
index.Time = datetime.strptime(data[0], "%Y-%m-%d")
index.EndTime = index.Time + timedelta(days=1)
index['Open'] = float(data[1])
index['High'] = float(data[2])
index['Low'] = float(data[3])
index["Close"] = float(data[4])
index["Volume"] = float(data[5])
except:
pass
return index
class DAX(PythonData):
'''DAX Index Custom Data Class'''
def GetSource(self, config: SubscriptionDataConfig, date: datetime, isLiveMode: bool) -> SubscriptionDataSource:
url = "https://docs.google.com/spreadsheets/d/e/2PACX-1vRbkHJYzOmIe6njUpC7Vk6ujpoEijo5Vdf1MfmOkrT-I_oPaSDJm7FyhFRchLMDtjXs3A7f82nLqFOJ/pub?output=csv"
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, isLiveMode: bool) -> BaseData:
if not (line.strip() and line[0].isdigit()): return None
# New DAX object
index = DAX()
index.Symbol = config.Symbol
try:
# Example File Format:
# Date, Close
# 2011-09-13 7748.7
data = line.split(',')
index.Value = float(data[4])
if index.Value == 0:
return None
index.Time = datetime.strptime(data[0], "%Y-%m-%d")
index.EndTime = index.Time + timedelta(days=1)
index['open'] = float(data[1])
index['high'] = float(data[2])
index['low'] = float(data[3])
index["close"] = float(data[4])
index["volume"] = float(data[5])
except:
pass
return index
class HSI(PythonData):
'''HSI Index Custom Data Class'''
def GetSource(self, config: SubscriptionDataConfig, date: datetime, isLiveMode: bool) -> SubscriptionDataSource:
url = "https://docs.google.com/spreadsheets/d/e/2PACX-1vQ9Ga6HAFfT6DRwei-zzuAcXdJifL_6JXYhiFXXa9fEkWYrm9ET7dHXx_LayDE5Qn5LE2OhjvLN9a8o/pub?output=csv"
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, isLiveMode: bool) -> BaseData:
if not (line.strip() and line[0].isdigit()): return None
# New HSI object
index = HSI()
index.Symbol = config.Symbol
try:
# Example File Format:
# Date, Close
# 2011-09-13 7748.7
data = line.split(',')
index.Value = float(data[4])
if index.Value == 0:
return None
index.Time = datetime.strptime(data[0], "%Y-%m-%d")
index.EndTime = index.Time + timedelta(days=1)
index['open'] = float(data[1])
index['high'] = float(data[2])
index['low'] = float(data[3])
index["close"] = float(data[4])
index["volume"] = float(data[5])
except:
pass
return index
class MXCN(PythonData):
'''MXCN Index Custom Data Class'''
def GetSource(self, config: SubscriptionDataConfig, date: datetime, isLiveMode: bool) -> SubscriptionDataSource:
url = "https://docs.google.com/spreadsheets/d/e/2PACX-1vQdl3pqBTiVHzSN1aY90qNGuYxUX6ozYfHpHd-7MWjDCpCmhnovuvVqHgGhmUlIeLinggAgN7rLzGSV/pub?output=csv"
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, isLiveMode: bool) -> BaseData:
if not (line.strip() and line[0].isdigit()): return None
# New MXCN object
index = MXCN()
index.Symbol = config.Symbol
try:
# Example File Format:
# Date, Close
# 2011-09-13 7748.7
data = line.split(',')
index.Value = float(data[4])
if index.Value == 0:
return None
index.Time = datetime.strptime(data[0], "%Y-%m-%d")
index.EndTime = index.Time + timedelta(days=1)
index['open'] = float(data[1])
index['high'] = float(data[2])
index['low'] = float(data[3])
index["close"] = float(data[4])
index["volume"] = float(data[5])
except:
pass
return index
class MXEF(PythonData):
'''MXEF Index Custom Data Class'''
def GetSource(self, config: SubscriptionDataConfig, date: datetime, isLiveMode: bool) -> SubscriptionDataSource:
url = "https://docs.google.com/spreadsheets/d/e/2PACX-1vT0vsxYCrpW3zGHKEAQNdiE62kLU_9zL2ccA5jPS6zBmF8vHBnU2o9EMbssEoo5EYFrMOmX3BX-dOKw/pub?output=csv"
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, isLiveMode: bool) -> BaseData:
if not (line.strip() and line[0].isdigit()): return None
# New MXEF object
index = MXEF()
index.Symbol = config.Symbol
try:
# Example File Format:
# Date, Close
# 2011-09-13 7748.7
data = line.split(',')
index.Value = float(data[4])
if index.Value == 0:
return None
index.Time = datetime.strptime(data[0], "%Y-%m-%d")
index.EndTime = index.Time + timedelta(days=1)
index['open'] = float(data[1])
index['high'] = float(data[2])
index['low'] = float(data[3])
index["close"] = float(data[4])
index["volume"] = float(data[5])
except:
pass
return index
class NDX(PythonData):
'''NDX Index Custom Data Class'''
def GetSource(self, config: SubscriptionDataConfig, date: datetime, isLiveMode: bool) -> SubscriptionDataSource:
url = "https://docs.google.com/spreadsheets/d/e/2PACX-1vRgQwBkjdjPFTPmp6edtqUzfrYqx7saEBz5JjlmzKYjJAzCzkGdeubNsnWzj4nWYSNqElEu65AS-Su7/pub?output=csv"
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, isLiveMode: bool) -> BaseData:
if not (line.strip() and line[0].isdigit()): return None
# New NDX object
index = NDX()
index.Symbol = config.Symbol
try:
# Example File Format:
# Date, Close
# 2011-09-13 7748.7
data = line.split(',')
index.Value = float(data[4])
if index.Value == 0:
return None
index.Time = datetime.strptime(data[0], "%Y-%m-%d")
index.EndTime = index.Time + timedelta(days=1)
index['open'] = float(data[1])
index['high'] = float(data[2])
index['low'] = float(data[3])
index["close"] = float(data[4])
index["volume"] = float(data[5])
except:
pass
return index
class NKY(PythonData):
'''NKY Index Custom Data Class'''
def GetSource(self, config: SubscriptionDataConfig, date: datetime, isLiveMode: bool) -> SubscriptionDataSource:
url = "https://docs.google.com/spreadsheets/d/e/2PACX-1vShzf8gbR_Y0B2VtEabDm39TesUs-iFDBkKy_DdN6rIhvDyW8ojwkPC2-C7rKwcBWdY_snB1rZDPWpF/pub?output=csv"
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, isLiveMode: bool) -> BaseData:
if not (line.strip() and line[0].isdigit()): return None
# New NKY object
index = NKY()
index.Symbol = config.Symbol
try:
# Example File Format:
# Date, Close
# 2011-09-13 7748.7
data = line.split(',')
index.Value = float(data[4])
if index.Value == 0:
return None
index.Time = datetime.strptime(data[0], "%Y-%m-%d")
index.EndTime = index.Time + timedelta(days=1)
index['open'] = float(data[1])
index['high'] = float(data[2])
index['low'] = float(data[3])
index["close"] = float(data[4])
index["volume"] = float(data[5])
except:
pass
return index
class SX6E(PythonData):
'''SX6E Index Custom Data Class'''
def GetSource(self, config: SubscriptionDataConfig, date: datetime, isLiveMode: bool) -> SubscriptionDataSource:
url = "https://docs.google.com/spreadsheets/d/e/2PACX-1vSjjCrkk7cCfa_hETYUWRdlVGEWQdKb98NRwlRvwwEzzDMtfD1VnYgUlGkG-hFsoaX-pwe4x7mqcPfm/pub?output=csv"
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, isLiveMode: bool) -> BaseData:
if not (line.strip() and line[0].isdigit()): return None
# New SX6E object
index = SX6E()
index.Symbol = config.Symbol
try:
# Example File Format:
# Date, Close
# 2011-09-13 7748.7
data = line.split(',')
index.Value = float(data[4])
if index.Value == 0:
return None
index.Time = datetime.strptime(data[0], "%Y-%m-%d")
index.EndTime = index.Time + timedelta(days=1)
index['open'] = float(data[1])
index['high'] = float(data[2])
index['low'] = float(data[3])
index["close"] = float(data[4])
index["volume"] = float(data[5])
except:
pass
return index
class UKX(PythonData):
'''UKX Index Custom Data Class'''
def GetSource(self, config: SubscriptionDataConfig, date: datetime, isLiveMode: bool) -> SubscriptionDataSource:
url = "https://docs.google.com/spreadsheets/d/e/2PACX-1vRHfjNeIvwe3RoB0rd-DW6ff4OyDRgHXj8ielG0Y9cLm1rwRq0EBFRpX7PRg9QNRvjpxDPNjtRlokcQ/pub?output=csv"
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, isLiveMode: bool) -> BaseData:
if not (line.strip() and line[0].isdigit()): return None
# New UKX object
index = UKX()
index.Symbol = config.Symbol
try:
# Example File Format:
# Date, Close
# 2011-09-13 7748.7
data = line.split(',')
index.Value = float(data[4])
if index.Value == 0:
return None
index.Time = datetime.strptime(data[0], "%Y-%m-%d")
index.EndTime = index.Time + timedelta(days=1)
index['open'] = float(data[1])
index['high'] = float(data[2])
index['low'] = float(data[3])
index["close"] = float(data[4])
index["volume"] = float(data[5])
except:
pass
return index