| Overall Statistics |
|
Total Trades 398 Average Win 1.03% Average Loss -0.77% Compounding Annual Return 9.891% Drawdown 18.800% Expectancy 0.232 Net Profit 40.867% Sharpe Ratio 0.568 Probabilistic Sharpe Ratio 17.130% Loss Rate 47% Win Rate 53% Profit-Loss Ratio 1.33 Alpha 0.102 Beta -0.035 Annual Standard Deviation 0.17 Annual Variance 0.029 Information Ratio -0.174 Tracking Error 0.263 Treynor Ratio -2.735 Total Fees $41411.49 Estimated Strategy Capacity $17000000.00 Lowest Capacity Asset AJG R735QTJ8XC9X |
import numpy as np
import pandas as pd
from scipy import stats
class CryingVioletKitten(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2018, 1, 27) # Set Start Date
self.SetCash(10000000) # Set Strategy Cash
# self.AddEquity("SPY", Resolution.Minute)
self.pickedUniverse = False
self.lastMonth = -1
self.UniverseSettings.Resolution = Resolution.Daily
self.market = self.AddEquity("SPY", Resolution.Daily).Symbol #sid(8554)
self.market_window = 200
self.atr_window = 20
self.talib_window = self.atr_window + 5
self.risk_factor = 0.002 #0.01 = less position, more % but more risk Default for strategy is .001 - 0.00075
self.momentum_window_length = 90
self.market_cap_limit = 700
self.rank_table_percentile = .3
self.significant_position_difference = 0.1
self.min_momentum = 0.020
self.leverage_factor = 1 #1=2154%. Guy's version is 1.4=3226%
self.use_stock_trend_filter = False #either False = Off, True = On
self.sma_window_length = 100 #Used for the stock trend filter
self.use_market_trend_filter = 1 #either 0 = Off, 1 = On. Filter on SPY
self.use_average_true_range = 0 #either 0 = Off, 1 = On. Manage risk with individual stock volatility
self.average_true_rage_multipl_factor = 1 #Change the weight of the ATR. 1327%
self.assets = []
self.assetATR = {}
# Schedule my rebalance function
self.Schedule.On(self.DateRules.MonthStart("SPY"),
self.TimeRules.BeforeMarketClose(self.market, 0),
self.Rebalance)
# Cancel all open orders at the end of each day.
# self.Schedule.On(self.DateRules.EveryDay("SPY"),
# self.TimeRules.BeforeMarketClose(self.market, 0),
# self.CancelOpenOrders)
self.lastMonth = -1
self.AddUniverse(self.Coarse, self.Fine)
def ConvertToQuantopianHistory(self, OHCL, history, isMarket = False):
try:
prices = {}
for tuple in history.itertuples():
symbol = tuple.Index[0]
if symbol not in prices:
prices[symbol] = []
if OHCL == "high":
prices[symbol].append(tuple.high)
elif OHCL == "low":
prices[symbol].append(tuple.low)
elif OHCL == "close":
prices[symbol].append(tuple.close)
elif OHCL == "open":
prices[symbol].append(tuple.open)
remove = []
if isMarket == False:
for symbol, price in prices.items():
if len(price) != self.talib_window:
remove.append(symbol)
for symbol in remove:
prices.pop(symbol, None)
if symbol in self.assets:
self.assets.remove(symbol)
if symbol in self.assetATR:
self.assetATR.pop(symbol, None)
self.RemoveSecurity(symbol)
prices = pd.DataFrame.from_dict(prices)
except:
self.Debug("Failed")
return prices
def SignificantChangeInPositionSize(self, new_position_size, old_position_size):
return np.abs((new_position_size - old_position_size) / old_position_size) > self.significant_position_difference
def GetPositionSize(self, security):
try:
averageTrueRange = self.assetATR[security].Current.Value
if not self.use_average_true_range: #average_true_range
average_true_range = 1 #divide by 1 gives... same initial number
self.average_true_rage_multipl_factor = 1
return (self.Portfolio.TotalPortfolioValue * self.risk_factor) / (average_true_range * self.average_true_rage_multipl_factor)
except:
# return 0
atr = self.ATR(security, self.atr_window, MovingAverageType.Simple, Resolution.Daily)
history = self.History(security, self.atr_window, Resolution.Daily)
for tuple in history.loc[security].itertuples():
bar = TradeBar(tuple.Index, security, tuple.open, tuple.high, tuple.low, tuple.close, tuple.volume)
atr.Update(bar)
self.assetATR[security] = atr
try:
averageTrueRange = self.assetATR[security].Current.Value
if not self.use_average_true_range: #average_true_range
average_true_range = 1 #divide by 1 gives... same initial number
self.average_true_rage_multipl_factor = 1
return (self.Portfolio.TotalPortfolioValue * self.risk_factor) / (average_true_range * self.average_true_rage_multipl_factor)
except:
return 0
def Rebalance(self):
history = self.History(self.assets, self.talib_window, Resolution.Daily)
highs = self.ConvertToQuantopianHistory("high", history)
lows = self.ConvertToQuantopianHistory("low", history)
closes = self.ConvertToQuantopianHistory("close", history)
estimated_cash_balance = self.Portfolio.Cash
slopes = np.log(closes[self.assets].tail(self.momentum_window_length)).apply(self._slope)
# self.Debug(slopes.order(ascending=False).head(10))
slopes = slopes[slopes > self.min_momentum]
ranking_table = slopes[slopes > slopes.quantile(1 - self.rank_table_percentile)]
# close positions that are no longer in the top of the ranking table
positions = [x.Key for x in self.Portfolio if x.Value.Invested]
for security in positions:
price = self.Securities[security].Price
position_size = self.Portfolio[security].Quantity
if security not in ranking_table.index: # and data.can_trade:
self.Liquidate(security)
#order_target(security, 0, style=LimitOrder(price))
estimated_cash_balance += price * position_size
else: #if data.can_trade(security):
new_position_size = self.GetPositionSize(security)
if self.SignificantChangeInPositionSize(new_position_size, position_size):
estimated_cost = price * (new_position_size * self.leverage_factor - position_size)
if new_position_size * self.leverage_factor == 0:
self.Liquidate(security)
else:
self.MarketOrder(security, int(new_position_size * self.leverage_factor))
# self.SetHoldings(security, new_position_size * self.leverage_factor)
#order_target(security, new_position_size * context.leverage_factor, style=LimitOrder(price))
estimated_cash_balance -= estimated_cost
# Market history is not used with the trend filter disabled
# Removed for efficiency
if self.use_market_trend_filter:
market_history = self.History(self.market, self.market_window, Resolution.Daily)
market_history = self.ConvertToQuantopianHistory("close", market_history, True)
current_market_price = self.Securities[self.market].Price
average_market_price = market_history.mean()
else:
average_market_price = 0
if (current_market_price > average_market_price[0]) : #if average is 0 then jump in
for security in ranking_table.index:
if security not in [x.Key for x in self.Portfolio if x.Value.Invested]: #and data.can_trade(security):
new_position_size = self.GetPositionSize(security)
estimated_cost = self.Securities[security].Price * new_position_size * self.leverage_factor
if estimated_cash_balance > estimated_cost:
posSize = new_position_size * self.leverage_factor
if posSize == 0:
self.Liquidate(security)
else:
self.MarketOrder(security, int(posSize))
# self.SetHoldings(security, posSize)
#order_target(security, new_position_size * context.leverage_factor, style=LimitOrder(data.current(security, "price")))
estimated_cash_balance -= estimated_cost
def CancelOpenOrders(self):
openOrders = self.Transactions.GetOpenOrders()
if len(openOrders)> 0:
for x in openOrders:
self.Transactions.CancelOrder(x.Id)
#record(exposure=context.account.leverage)
def OnData(self, data):
pass
def _slope(self, ts):
x = np.arange(len(ts))
slope, intercept, r_value, p_value, std_err = stats.linregress(x, ts)
annualized_slope = (1 + slope)**250
return annualized_slope * (r_value ** 2)
def OnSecuritiesChanged(self, changes):
for security in changes.RemovedSecurities:
symbol = security.Symbol
self.Liquidate(symbol)
if symbol in self.assets:
self.assets.remove(symbol)
if str(symbol) in self.assetATR:
self.assetATR.pop(str(symbol), None)
for security in changes.AddedSecurities:
symbol = security.Symbol
if str(symbol) not in self.assetATR:
atr = self.ATR(symbol, self.atr_window, MovingAverageType.Simple, Resolution.Daily)
history = self.History(symbol, self.atr_window, Resolution.Daily)
for tuple in history.loc[symbol].itertuples():
bar = TradeBar(tuple.Index, symbol, tuple.open, tuple.high, tuple.low, tuple.close, tuple.volume)
atr.Update(bar)
self.assetATR[str(symbol)] = atr
if symbol == self.market: continue
if symbol not in self.assets:
self.assets.append(symbol)
def Coarse(self, coarse):
if self.lastMonth == self.Time.month:
return Universe.Unchanged
self.lastMonth = self.Time.month
# if self.pickedUniverse == True:
# return Universe.Unchanged
filteredCoarse = [x.Symbol for x in coarse if x.HasFundamentalData and x.DollarVolume > 0 and x.Price > 0]
return filteredCoarse
def Fine(self, fine):
marketCapFilter = [x for x in fine]
marketCapFilter = sorted(marketCapFilter, key = lambda x: x.MarketCap, reverse = True)[:self.market_cap_limit]
fineFiltered = [x for x in marketCapFilter if x.SecurityReference.IsPrimaryShare == True
and x.SecurityReference.IsDepositaryReceipt == False]
fineFilteredSymbols = [x.Symbol for x in fineFiltered]
final = []
if self.use_stock_trend_filter == True:
history = self.History(fineFilteredSymbols, self.sma_window_length, Resolution.Daily)
sma = {}
for symbol in fineFillteredSymbols:
curSma = SimpleMovingAverage(self.sma_window_length)
for tuple in history.loc[symbol].itertuples():
curSma.Update(tuple.Index, tuple.close)
sma[symbol] = curSma.Current.Value
smaFiltered = [x.Symbol for x in fineFiltered if x.Price > sma[x.Symbol]]
final = smaFiltered
else:
final = fineFilteredSymbols
self.pickedUniverse = True
return final