| Overall Statistics |
|
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -46.106 Tracking Error 0.221 Treynor Ratio 0 Total Fees $0.00 |
import math
res = Resolution.Minute
asset_class = 'stocks'
pairs_list = {
'stocks': [
('CVX', 'XOM'),
('EGBN', 'FMBI'),
('SPY', 'IWM')
],
'forex': [
('EURUSD', 'AUDUSD')
],
'crypto': [
('BTCUSD', 'ETHUSD')
]
}
symbol1, symbol2 = pairs_list[asset_class][2]
infinity = float('inf')
NoneType = type(None)
class MeanReversionResearch(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 11, 4)
self.SetEndDate(2020, 11, 6)
self.SetCash(100_000)
self.ZScoreSeries = Series("Z-Score", SeriesType.Line, 0)
chart = Chart("Z-Score")
chart.AddSeries(self.ZScoreSeries)
self.AddChart(chart)
if asset_class == 'stocks':
self.AddEquity(symbol1, res)
self.AddEquity(symbol2, res)
elif asset_class == 'forex':
self.AddForex(symbol1, res)
self.AddForex(symbol2, res)
elif asset_class == 'crypto':
self.AddCrypto(symbol1, res)
self.AddCrypto(symbol2, res)
self.last_price1 = None
self.last_price2 = None
self.max_delta = 0
self.min_delta = 0
self.lowest_zscore = infinity
self.Spread = SpreadRatio(f'{symbol1}_{symbol2}')
def OnData(self, data):
if res == Resolution.Tick:
if data.ContainsKey(symbol1) and data[symbol1] != None:
self.last_price1 = data[symbol1][-1].Price
if data.ContainsKey(symbol2) and data[symbol2] != None:
self.last_price2 = data[symbol2][-1].Price
else:
if data.ContainsKey(symbol1) and data[symbol1] != None:
self.last_price1 = data[symbol1].Close
if data.ContainsKey(symbol2) and data[symbol2] != None:
self.last_price2 = data[symbol2].Close
if self.last_price1 != None and self.last_price2 != None:
price_delta = self.last_price1 - self.last_price2
zscore = self.Spread.GetZScore(price_delta)
self.ZScoreSeries.AddPoint(self.Time, zscore)
if abs(zscore) < self.lowest_zscore:
self.min_delta = price_delta
self.lowest_zscore = zscore
if price_delta > self.max_delta:
self.max_delta = price_delta
self.Debug(f'max delta: {self.max_delta - self.min_delta}')
self.last_price1 = None
self.last_price2 = None
class SpreadRatio:
def __init__(self, label):
self.label = label
self.squared_differences = 0
self.mean = 0
self.var = 0
self.N = 1
def GetZScore(self, ratio):
# Incrementally update sample estimates via Welford (1962)
new_mean = self.mean + (ratio - self.mean) / self.N
self.squared_differences += (ratio - self.mean) * (ratio - new_mean)
self.var = self.squared_differences / self.N
self.N += 1
self.mean = new_mean
sqrt = math.sqrt(self.var)
if sqrt == 0.0:
return ratio - self.mean
else:
return (ratio - self.mean) / sqrt