| Overall Statistics |
|
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -51.397 Tracking Error 0.047 Treynor Ratio 0 Total Fees $0.00 |
from datetime import timedelta
import numpy as np
from scipy import stats
from collections import deque
import math
from QuantConnect.Data.UniverseSelection import *
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
class conners_crash(QCAlgorithm):
#filteredByPrice = None
def Initialize(self):
self.SetStartDate(2018, 1, 1)
self.SetEndDate(2018, 1, 5)
self.SetCash(100000)
self.AddUniverse(self.CoarseSelectionFilter)
self.UniverseSettings.Resolution = Resolution.Daily
self.SetWarmUp(150)
self.UniverseSettings.Leverage = 1
#self.SetSecurityInitializer(lambda x: x.SetDataNormalizationMode(DataNormalizationMode.Raw))
self.indi = {}
self.rsi = {}
self.vola = {}
self.indi_Filter = {}
self.rsi_Filter = {}
self.rsi_w = 3
self.streak_w = 3
self.pct_rank_w = 100
self.vola_w = 100
self.hist_vol = 100
self.max_num_shorts = 5
self.crsi_entry=80
self.crsi_flat=30
def CoarseSelectionFilter(self, coarse):
#filtered = [x for x in coarse if x.Price > 5 and x.DollarVolume > 1e9 ]
'''You should cap this to run at x day/day(s). It is currently running as often as the resolution and may be returning a very large number of stocks.'''
filtered = [x for x in coarse if x.Price > 5 and x.Volume > 1e6 ]
'''Run the algorithm without the 10 stock cap, and speed will be very slow'''
final_universe = [x.Symbol for x in filtered][:10] #This returns 10 stocks
self.Debug(len(final_universe))
return final_universe
def OnSecuritiesChanged(self, changes):
self.symbolDataBySymbol = {}
list_b1 = []
list_b2 = []
list_pop = []
symbols = [ x.Symbol for x in changes.AddedSecurities ]
for ticker in symbols:
if ticker not in self.symbolDataBySymbol:
crsi = CustomConnors( 'My_Custom', self.rsi_w, self.streak_w, self.pct_rank_w)
vola = CustomVolatility( 'My_Custom', self.vola_w )
self.RegisterIndicator(ticker, crsi, Resolution.Daily)
self.RegisterIndicator(ticker, vola, Resolution.Daily)
history = self.History(ticker, max(self.rsi_w, self.streak_w, self.pct_rank_w,self.vola_w), Resolution.Daily)
crsi.WarmUp(history)
vola.WarmUp(history)
symbolData = SymbolData(ticker, crsi, vola)
self.symbolDataBySymbol[ticker] = symbolData
'''
Carsten:
My original version was this below (as you suggested), but i lost somehow ticker in OnData, thas why I disbled
the pop for the ticker which were in the portfolio. Now we get an issue in OnData'''
symbolsRemoved = [ x.Symbol for x in changes.RemovedSecurities ]
for ticker in symbolsRemoved:
self.symbolDataBySymbol.pop(ticker, None)
'''After created an array of removed securities. You need to remove the securities
from the self.symbolDataBySymbol dictionary by doing something along the lines:
for stock in symbolsRemoved:
self.symbolDataBySymbol.pop(stock, None)'''
'''I recommend moving the below lines of code to a new method such as OnData. It is important to keep separation of concerns (OnSecuritiesChanged
should only handle incoming and exiting securities; evaluating whether a security is short or not can be done in OnData. '''
#stocks_short = [ x.Symbol for x in self.Portfolio.Values if x.IsShort ]
#for ticker in stocks_short:
# list_b1.append(str(ticker))
#for ticker in symbolsRemoved:
# if not ticker in stocks_short:
# # self.symbolDataBySymbol.pop(ticker, None)
# list_pop.append(str(ticker))
# else:
# list_b2.append(str(ticker))
#
#a= ticker
def OnData(self, data):
'''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
Arguments:
data: Slice object keyed by symbol containing the stock data
1. The stock’s closing price must be greater than $5/share.
2. average day volume 21 trading > 1e6 shares a day
3. 100-day Historical Volatility >= 100%.
4. Enter Short on ConnorsRSI (CRSI,3,2,10) > 90 (RSI, Streak, Percentage Change)
5. Short next day on a limit order 5% higher.
6. The exit Trade ConnorsRSI < 20
Results from 2007-2017 should be:
# Trades: around 500
Win Rate: 73%
Avg. Profit Per Trade: 6.5%
Avg. Days per Tr%ade: 12
Avg. Win: 16.5%
Avg. Loss: 19.5 %
'''
stocks_short = []
neutral = []
list_a = []
list_b = []
# only stock with volatilytiy > 100%
filter1 = [x[0] for x in self.symbolDataBySymbol.items() if (self.symbolDataBySymbol[x[0]].VOLA.Value > self.hist_vol) ]
entry = [x for x in filter1 if (self.symbolDataBySymbol[x].CRSI.Value > self.crsi_entry) ]
# find short stocks
stocks_short = [ x.Symbol for x in self.Portfolio.Values if x.IsShort ]
'''
Carsten:
for testing: you can see stocks_short does not match with self.symbolDataBySymbol'''
#for x in self.symbolDataBySymbol:
# list_a.append(str(x))
#for x in stocks_short:
# list_b.append(str(x))
'''
Carsten:
to run, you would need this part, but it's not what it want to do '''
#for x in stocks_short:
# if not x in self.symbolDataBySymbol:
# neutral.append(x)
# else:
# if self.symbolDataBySymbol[x].CRSI.Value < self.crsi_flat < 30:
# neutral.append(x)
'''
Carsten:
Now we get an issue, because the ticker which are in [ x.Symbol for x in self.Portfolio.Values if x.IsShort ]
are not in self.symbolDataBySymbol, I don't know why'''
neutral = [x for x in stocks_short if (self.symbolDataBySymbol[x].CRSI.Value < self.crsi_flat)]
# calculating size
actuall_shorts = len(stocks_short)
new_entrys = self.max_num_shorts - actuall_shorts
portfolio_cash = self.Portfolio.Cash # Sum of all currencies in account (only settled cash)
one_positon_cash = portfolio_cash / new_entrys
if new_entrys and entry:
for ticker in entry:
if new_entrys <= 0:
continue
numer_stocks = one_positon_cash * 0.95 / self.Securities[ticker].Price
self.MarketOrder(ticker, -numer_stocks)
new_entrys-=1
for ticker in neutral:
self.Liquidate(ticker)
class SymbolData:
'''Contains data specific to a symbol required by this model'''
def __init__(self, symbol, crsi, vola):
self.Symbol = symbol
self.CRSI = crsi
self.VOLA = vola
class CustomConnors:
def __init__(self, name, rsi_p, streak_p, pct_rank_p):
period = max(rsi_p, streak_p, pct_rank_p)
self.Name = name
self.Time = datetime.min
self.IsReady = False
self.Value = 0
self.Rsi = 0
self.Streak = 0
self.Pct_Rank = 0
self.queue = deque(maxlen=period)
self.rsi_p = rsi_p
self.streak_p = streak_p
self.pct_rank_p = pct_rank_p
def __repr__(self):
return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value)
# Update method is mandatory
def Update(self, input):
return self.Update_Main(input.Time, input.Close)
def Update_warmup(self, input):
return self.Update_Main(input.Index, input.close)
def Update_Main(self, time, value):
#def Update(self, input):
#self.queue.appendleft(input.close) # used by warm up
#self.queue.appendleft(input.Close) # used by RegisterIndicator
self.queue.appendleft(value) # neutral
count = len(self.queue)
#self.Time = input.Index # used by warm up
#self.Time = input.Time # used by RegisterIndicator
self.Time = time # neutral
self.IsReady = count == self.queue.maxlen
#### start here the indicator calulation
if self.IsReady:
# Connors streak indicator
# Cassic RSI from zipline
#"100 - (100 / (1 + (ups / downs)))"
close = np.array(self.queue)[-self.rsi_p:]
diffs = np.diff(close)
ups = np.nanmean(np.clip(diffs, 0, np.inf))
downs = abs(np.nanmean(np.clip(diffs, -np.inf,0)))
if downs == 0:
self.Rsi = 100
else:
self.Rsi = 100 - (100 / (1 + (ups / downs)))
# Connors streak indicator
curstreak = 0
close = np.array(self.queue)[-self.streak_p:]
l=len(close)
streak = np.zeros(l)
for i in range(l):
if close[i-1] < close[i]:
streak[i] = curstreak = max(1, curstreak + 1)
elif close[i-1] > close[i]:
streak[i] = curstreak = min(-1, curstreak - 1)
else:
streak[i] = curstreak = 0
# cal the rsi from streak
diffs = np.diff(streak)
ups = np.nanmean(np.clip(diffs, 0, np.inf))
downs = abs(np.nanmean(np.clip(diffs, -np.inf, 0)))
if downs == 0:
self.Streak = 100
else:
self.Streak = 100 - (100 / (1 + (ups / downs)))
# Connors Pct Rank Indicator
#cl = np.zeros(count)
close = np.array(self.queue)[-self.pct_rank_p:]
#close = np.array([[1, 1],[2, 2],[4,4],[15,15],[226,22]])
daily_returns = np.diff(close) / close[0:-1]
# today return greater than how many past returns
today_gt_past = daily_returns[-1] > daily_returns [0:-1]
# sum of today > past
num = sum(today_gt_past)
# sum as as percentage
l=np.shape(today_gt_past)[0]
self.Pct_Rank = num / l * 100
# combined Connor RSI
self.Value = (self.Rsi + self.Streak + self.Pct_Rank ) / 3.0
#self.Value = (self.Rsi + self.Streak ) / 2.0
#### finish the custom indicator
return self.IsReady
def WarmUp(self,history):
for tuple in history.itertuples():
self.Update_warmup(tuple)
class CustomVolatility:
def __init__(self, name, period):
self.Name = name
self.Time = datetime.min
self.IsReady = False
self.Value = 0
self.queue = deque(maxlen=period)
def __repr__(self):
return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value)
# Update method is mandatory
def Update(self, input):
return self.Update_Main(input.Time, input.Close)
def Update_warmup(self, input):
return self.Update_Main(input.Index, input.close)
def Update_Main(self, time, value):
self.queue.appendleft(value) # neutral
count = len(self.queue)
self.Time = time # neutral
self.IsReady = count == self.queue.maxlen
#### start here the indicator calulation
if self.IsReady:
# [0:-1] is needed to remove last close since diff is one element shorter
close = np.array(self.queue)
log_close = np.log(close)
diffs = np.diff(log_close)
self.Value = diffs.std() * math.sqrt(252) * 100
return self.IsReady
def WarmUp(self,history):
for tuple in history.itertuples():
self.Update_warmup(tuple)