| Overall Statistics |
|
Total Trades 681 Average Win 0.93% Average Loss -1.31% Compounding Annual Return -33.809% Drawdown 49.000% Expectancy -0.048 Net Profit -33.809% Sharpe Ratio -0.656 Probabilistic Sharpe Ratio 2.157% Loss Rate 44% Win Rate 56% Profit-Loss Ratio 0.71 Alpha -0.236 Beta 0.036 Annual Standard Deviation 0.361 Annual Variance 0.13 Information Ratio -0.54 Tracking Error 0.39 Treynor Ratio -6.571 Total Fees $3017.52 |
from datetime import timedelta
import numpy as np
from scipy import stats
from collections import deque
import math
from QuantConnect.Data.UniverseSelection import *
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
class conners_crash(QCAlgorithm):
filteredByPrice = None
def Initialize(self):
self.SetStartDate(2018, 1, 1)
self.SetEndDate(2018, 12, 31)
self.SetCash(100000)
self.AddUniverse(self.CoarseSelectionFilter)
self.UniverseSettings.Resolution = Resolution.Daily
self.SetWarmUp(150)
self.UniverseSettings.Leverage = 1
#self.DefaultOrderProperties.TimeInForce = TimeInForce.Day
self.indi = {}
self.rsi = {}
self.vola = {}
self.indi_Filter = {}
self.rsi_Filter = {}
self.rsi_w = 3
self.streak_w = 2
self.pct_rank_w = 100
self.vola_w = 100
self.CRSI_entry = 90
self.CRSI_exit = 30
self.securities = []
self.avtive_sec = []
def CoarseSelectionFilter(self, coarse):
filtered = [x for x in coarse if x.Price > 5 and x.Volume > 1e6 ]
return [x.Symbol for x in filtered]
def OnSecuritiesChanged(self, changes):
for security in changes.AddedSecurities:
#self.avtive_sec.append(security)
symbol = security.Symbol
### Connors RSI # DoDo include the standart RSI
# initialize indicator
self.indi[symbol] = CustomConnors( 'My_Custom', self.rsi_w, self.streak_w, self.pct_rank_w)
self.RegisterIndicator(symbol, self.indi[symbol], Resolution.Daily)
# warmup indicator
history = self.History(symbol, max(self.rsi_w, self.streak_w, self.pct_rank_w), Resolution.Daily)
self.indi[symbol].WarmUp(history)
### Volatility
# initialize indicator
self.vola[symbol] = CustomVolatility( 'My_Custom', self.vola_w )
self.RegisterIndicator(symbol, self.vola[symbol], Resolution.Daily)
# warmup indicator
history = self.History(symbol, self.vola_w, Resolution.Daily)
self.vola[symbol].WarmUp(history)
# remove securities
for security in changes.RemovedSecurities:
#self.avtive_sec.remove(security)
symbol = security.Symbol
if security in self.indi:
self.indi[symbol].remove(security)
if security in self.rsi:
self.rsi[symbol].remove(security)
if security in self.vola:
self.vola[symbol].remove(security)
def OnData(self, data):
'''
'''
insights = []
C_RSI = {}
filter1 = []
stocks_short = []
my_invested = []
# only stock with volatilytiy > 100%
filter1 = [x[0] for x in self.vola.items() if (self.vola[x[0]].Value > 100) ]
# find short stocks
my_invested = [ x.Symbol for x in self.Portfolio.Values if x.Invested ]
# check if it is short
for ticker in my_invested:
if self.Portfolio[ticker].IsShort:
stocks_short.append(ticker)
## filtering for buy candiates
short = [x for x in filter1 if (self.indi[x].Value > self.CRSI_entry) ]
## filter portfolio for exit candiates
neutral = [x for x in stocks_short if (self.indi[x].Value < self.CRSI_exit) ]
# calculating shopping size
max_num_shorts = 20
actuall_shorts = len(stocks_short)
new_entrys = max_num_shorts - actuall_shorts
portfolio_cash = self.Portfolio.Cash # Sum of all currencies in account (only settled cash)
one_positon_cash = portfolio_cash / new_entrys
if new_entrys and len(short) > 0:
## sorting
#num_entry=min(,len(short))
#ordered = sorted(short, key=lambda x: self.indi[x].Value, reverse=True)[:num_entry]
for ticker in short:
if new_entrys <= 0:
continue
price = self.History( ticker, 1, Resolution.Daily)
if not price.empty:
latestclose = price.close[-1]
numer_stocks = one_positon_cash * 0.95 / latestclose
#self.Securities[ticker].FeeModel = ConstantFeeModel(0) # for testing purpose
self.LimitOrder(ticker, -numer_stocks, 1.03 * price.close[-1])
#self.StopLimitOrder(ticker, -numer_stocks, ( 5 * self.vola[ticker].Std + latestclose) , 1.03 * latestclose)
new_entrys-=1
for ticker in neutral:
self.Liquidate(ticker)
# checking indicator warmup
#self.Plot("CustomConnors", "C RSI", list(self.indi.values())[0].Value)
#self.Plot("CustomVolatility", "volatility", list(self.vola.values())[0].Value )
def OnOrderEvent(self, orderEvent):
order = self.Transactions.GetOrderById(orderEvent.OrderId)
if order.Status == OrderStatus.Filled:
self.Log(str(orderEvent))
#if order.Type == OrderType.Limit or order.Type == OrderTpye.StopMarket:
# self.Transactions.CancelOpenOrders(order.Symbol)
if order.Status == OrderStatus.Canceled:
self.Log(str(orderEvent))
class CustomConnors:
def __init__(self, name, rsi_p, streak_p, pct_rank_p):
period = max(rsi_p, streak_p, pct_rank_p)
self.Name = name
self.Time = datetime.min
self.IsReady = False
self.Value = 0
self.Rsi = 0
self.Streak = 0
self.Pct_Rank = 0
self.queue = deque(maxlen=period)
self.rsi_p = rsi_p
self.streak_p = streak_p
self.pct_rank_p = pct_rank_p
def __repr__(self):
return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value)
# Update method is mandatory
def Update(self, input):
return self.Update_Main(input.Time, input.Close)
def Update_warmup(self, input):
return self.Update_Main(input.Index, input.close)
def Update_Main(self, time, value):
self.queue.appendleft(value) # neutral
count = len(self.queue)
self.Time = time
self.IsReady = count == self.queue.maxlen
#### start here the indicator calulation
if self.IsReady:
# Connors streak indicator
# Cassic RSI from zipline
#"100 - (100 / (1 + (ups / downs)))"
close = np.array(self.queue)[-self.rsi_p:]
diffs = np.diff(close)
ups = np.nanmean(np.clip(diffs, 0, np.inf))
downs = abs(np.nanmean(np.clip(diffs, -np.inf,0)))
if downs == 0:
self.Rsi = 100
else:
self.Rsi = 100 - (100 / (1 + (ups / downs)))
# Connors streak indicator
curstreak = 0
close = np.array(self.queue)[-self.streak_p:]
l=len(close)
streak = np.zeros(l)
for i in range(l):
if close[i-1] < close[i]:
streak[i] = curstreak = max(1, curstreak + 1)
elif close[i-1] > close[i]:
streak[i] = curstreak = min(-1, curstreak - 1)
else:
streak[i] = curstreak = 0
# cal the rsi from streak
diffs = np.diff(streak)
ups = np.nanmean(np.clip(diffs, 0, np.inf))
downs = abs(np.nanmean(np.clip(diffs, -np.inf, 0)))
if downs == 0:
self.Streak = 100
else:
self.Streak = 100 - (100 / (1 + (ups / downs)))
# Connors Pct Rank Indicator
close = np.array(self.queue)[-self.pct_rank_p:]
daily_returns = np.diff(close) / close[0:-1]
today_gt_past = daily_returns[-1] > daily_returns [0:-1]
num = sum(today_gt_past)
# sum as as percentage
l=np.shape(today_gt_past)[0]
self.Pct_Rank = num / l * 100
# combined Connor RSI
self.Value = (self.Rsi + self.Streak + self.Pct_Rank ) / 3.0
#### finish the custom indicator
return self.IsReady
def WarmUp(self,history):
for tuple in history.itertuples():
self.Update_warmup(tuple)
class CustomVolatility:
def __init__(self, name, period):
self.Name = name
self.Time = datetime.min
self.IsReady = False
self.Value = 0
self.Std = 0
self.queue = deque(maxlen=period)
def __repr__(self):
return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value)
# Update method is mandatory
def Update(self, input):
return self.Update_Main(input.Time, input.Close)
def Update_warmup(self, input):
return self.Update_Main(input.Index, input.close)
def Update_Main(self, time, value):
self.queue.appendleft(value) # neutral
count = len(self.queue)
self.Time = time # neutral
self.IsReady = count == self.queue.maxlen
#### start here the indicator calulation
if self.IsReady:
# [0:-1] is needed to remove last close since diff is one element shorter
close = np.array(self.queue)
log_close = np.log(close)
diffs = np.diff(log_close)
self.Std = diffs.std()
self.Value = diffs.std() * math.sqrt(252) * 100
return self.IsReady
def WarmUp(self,history):
for tuple in history.itertuples():
self.Update_warmup(tuple)