| Overall Statistics |
|
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -0.566 Tracking Error 0.162 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset |
#region imports
from AlgorithmImports import *
#endregion
from risk import TrailingStop
import statistics as stat
import pickle
from collections import deque
class NadionResistanceShield(QCAlgorithm):
#class DataConsolidationAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2017, 1, 1) # Set Start Date
#self.SetEndDate(2021, 1, 1)
self.SetCash(10000) # Set Strategy Cash
self.symbolDataBySymbol = {}
self.trade = True
self.tickers = []
self.atr=[]
self.MarketCaps = ["SPY","QQQ"]#,"MDY","IWM"]
self.marketDataBySymbol = {}
self.spy = "SPY"
self.iwm = "IWM"
self.mdy = "MDY"
self.qqq = "QQQ"
self.tech_ROA_key = 'TECH_ROA'
#self.AddRiskManagement(TrailingStop)
# Trailing distance in $
self.trail_dist = 5
# Declare an attribute that we shall use for storing our
# stop loss ticket.
self.sl_order = None
# Declare an attribute that we will use to store the last trail level
# used. We will use this to decide whether to move the stop
self.last_trail_level = None
self.AddUniverseSelection(
FineFundamentalUniverseSelectionModel(self.CoarseFilter, self.FineFilter)
)
self.UniverseSettings.Resolution = Resolution.Hour
self.curr_month = -1
# store ROA of tech stocks
self.tech_ROA = {}
self.symbols = None
if self.LiveMode and not self.ObjectStore.ContainsKey(self.tech_ROA_key):
self.Quit('QUITTING: USING LIVE MOVE WITHOUT TECH_ROA VALUES IN OBJECT STORE')
self.quarters = 0
#self.SetWarmUp(900)
for symbolmark in self.MarketCaps:
symbol = self.AddEquity(symbolmark, Resolution.Daily).Symbol
sma50 = self.SMA(symbol, 50, Resolution.Daily, Field.Close)
sma200 = self.SMA(symbol, 200, Resolution.Daily, Field.Close)
self.marketDataBySymbol[symbol] = symbolMarkData(symbol, sma50, sma200)
for symbol in self.tickers:
self.AddEquity(symbol, Resolution.Hour)
ema10 = self.EMA(symbol, 10, Resolution.Hour, Field.Close)
sma200 = self.SMA(symbol, 200, Resolution.Daily, Field.Close)
sma7 = self.SMA(symbol, 7, Resolution.Hour, Field.Close)
sma20 = self.SMA(symbol, 20, Resolution.Daily, Field.Close)
self.sma = self.SMA(symbol, 20, Resolution.Hour, Field.Close)
ema100 = self.SMA(symbol, 100, Resolution.Daily, Field.Close)
ema200 = self.SMA(symbol, 200, Resolution.Daily, Field.Close)
ema300 = self.SMA(symbol, 300, Resolution.Daily, Field.Close)
ema20 = self.EMA(symbol, 20, Resolution.Hour, Field.Close)
ema50 = self.EMA(symbol, 50, Resolution.Hour, Field.Close)
rsi = self.RSI(symbol, 14, Resolution.Daily)
wilr = self.WILR(symbol, 14, Resolution.Daily)
wilr_fast = self.WILR(symbol, 10, Resolution.Daily)
atr = self.ATR(symbol, 20, Resolution.Daily)
self.atr.append(self.ATR(symbol, 7, Resolution.Daily))
high = self.MAX(symbol, 10, Resolution.Daily, Field.High)
high2 = self.MAX(symbol, 20, Resolution.Daily, Field.High)
low = self.MIN(symbol, 5, Resolution.Hour, Field.Low)
low2 = self.MIN(symbol, 5, Resolution.Daily, Field.Low)
self.sma.Updated += self.OnSMA
symbolData = SymbolData(symbol, ema10, sma20, sma200, sma7, ema100, ema200, ema300, ema20, ema50, rsi, wilr, wilr_fast, high, low, atr)
self.symbolDataBySymbol[symbol] = symbolData
self.spy = self.AddEquity("SPY", Resolution.Daily)
# Before the open
self.Schedule.On(self.DateRules.EveryDay("SPY"),
self.TimeRules.AfterMarketOpen("SPY", -5),
Action(self.beforeTheOpen))
#set the following between 1 - 4 hours depending on buy frequency
self.Schedule.On(self.DateRules.EveryDay("SPY"),
self.TimeRules.Every(timedelta(hours=6.5)),
self.buySignals)
#self.TimeRules.EveryDay,
#self.buySignals)
self.Schedule.On(self.DateRules.EveryDay("SPY"),
self.TimeRules.Every(timedelta(hours=6.5)),
self.sellSignals)
self.Schedule.On(self.DateRules.EveryDay("SPY"),
self.TimeRules.Every(timedelta(hours=2)),
self.sellProfitSignals)
self.Schedule.On(self.DateRules.EveryDay("SPY"),
self.TimeRules.AfterMarketOpen("SPY"),
self.tradeStart)
self.Schedule.On(self.DateRules.EveryDay("SPY"),
self.TimeRules.BeforeMarketClose("SPY"),
self.tradeEnd)
#self.AddRiskManagement(TrailingStop)
#self.AddRiskManagement(TrailingStopRiskManagementMode(0.04))
def OnEndOfAlgorithm(self):
self.Log('Algorithm End')
self.SaveData()
def SaveData(self):
'''
Saves the tech ROA data to ObjectStore
'''
# Symbol objects aren't picklable, hence why we use the ticker string
tech_ROA = {symbol.Value:ROA for symbol, ROA in self.tech_ROA.items()}
self.ObjectStore.SaveBytes(self.tech_ROA_key, pickle.dumps(tech_ROA))
def CoarseFilter(self, coarse):
# load data from ObjectStore
if len(self.tech_ROA) == 0 and self.ObjectStore.ContainsKey(self.tech_ROA_key):
tech_ROA = self.ObjectStore.ReadBytes(self.tech_ROA_key)
tech_ROA = pickle.loads(bytearray(tech_ROA))
self.tech_ROA = {Symbol.Create(ticker, SecurityType.Equity, Market.USA):ROA for ticker, ROA in tech_ROA.items()}
return list(self.tech_ROA.keys())
if self.curr_month == self.Time.month:
return Universe.Unchanged
self.curr_month = self.Time.month
# we only want to update our ROA values every three months
if self.Time.month % 3 != 1:
return Universe.Unchanged
self.quarters += 1
self.coarselist = [c.Symbol for c in coarse if c.HasFundamentalData]
return self.coarselist
def FineFilter(self, fine):
# book value == FinancialStatements.BalanceSheet.NetTangibleAssets (book value and NTA are synonyms)
# BM (Book-to-Market) == book value / MarketCap
# ROA == OperationRatios.ROA
# CFROA == FinancialStatements.CashFlowStatement.OperatingCashFlow / FinancialStatements.BalanceSheet.TotalAssets
# R&D to MktCap == FinancialStatements.IncomeStatement.ResearchAndDevelopment / MarketCap
# CapEx to MktCap == FinancialStatements.CashFlowStatement.CapExReported / MarketCap
# Advertising to MktCap == FinancialStatements.IncomeStatement.SellingGeneralAndAdministration / MarketCap
# note: this parameter may be slightly higher than pure advertising costs
tech_securities = [f for f in fine if f.AssetClassification.MorningstarSectorCode == MorningstarSectorCode.Technology and
f.OperationRatios.ROA.ThreeMonths]
for security in tech_securities:
# we use deques instead of RWs since deques are picklable
symbol = security.Symbol
if symbol not in self.tech_ROA:
# 3 years * 4 quarters = 12 quarters of data
self.tech_ROA[symbol] = deque(maxlen=12)
self.tech_ROA[symbol].append(security.OperationRatios.ROA.ThreeMonths)
if self.LiveMode:
# this ensures we don't lose new data from an algorithm outage
self.SaveData()
# we want to rebalance in the fourth month after the (fiscal) year ends
# so that we have the most recent quarter's data
if self.Time.month != 4 or (self.quarters < 12 and not self.LiveMode):
return Universe.Unchanged
# make sure our stocks has these fundamentals
tech_securities = [x for x in tech_securities if x.OperationRatios.ROA.OneYear and
x.FinancialStatements.CashFlowStatement.OperatingCashFlow.TwelveMonths and
x.FinancialStatements.BalanceSheet.TotalAssets.TwelveMonths and
x.FinancialStatements.IncomeStatement.ResearchAndDevelopment.TwelveMonths and
x.FinancialStatements.CashFlowStatement.CapExReported.TwelveMonths and
x.FinancialStatements.IncomeStatement.SellingGeneralAndAdministration.TwelveMonths and
x.MarketCap]
# compute the variance of the ROA for each tech stock
tech_VARROA = {symbol:stat.variance(ROA) for symbol, ROA in self.tech_ROA.items() if len(ROA) == ROA.maxlen}
if len(tech_VARROA) < 2:
return Universe.Unchanged
tech_VARROA_median = stat.median(tech_VARROA.values())
# we will now map tech Symbols to various fundamental ratios,
# and compute the median for each ratio
# ROA 1-year
tech_ROA1Y = {x.Symbol:x.OperationRatios.ROA.OneYear for x in tech_securities}
tech_ROA1Y_median = stat.median(tech_ROA1Y.values())
# Cash Flow ROA
tech_CFROA = {x.Symbol: (
x.FinancialStatements.CashFlowStatement.OperatingCashFlow.TwelveMonths
/ x.FinancialStatements.BalanceSheet.TotalAssets.TwelveMonths
) for x in tech_securities}
tech_CFROA_median = stat.median(tech_CFROA.values())
# R&D to MktCap
tech_RD2MktCap = {x.Symbol: (
x.FinancialStatements.IncomeStatement.ResearchAndDevelopment.TwelveMonths / x.MarketCap
) for x in tech_securities}
tech_RD2MktCap_median = stat.median(tech_RD2MktCap.values())
# CapEx to MktCap
tech_CaPex2MktCap = {x.Symbol: (
x.FinancialStatements.CashFlowStatement.CapExReported.TwelveMonths / x.MarketCap
) for x in tech_securities}
tech_CaPex2MktCap_median = stat.median(tech_CaPex2MktCap.values())
# Advertising to MktCap
tech_Ad2MktCap = {x.Symbol: (
x.FinancialStatements.IncomeStatement.SellingGeneralAndAdministration.TwelveMonths / x.MarketCap
) for x in tech_securities}
tech_Ad2MktCap_median = stat.median(tech_Ad2MktCap.values())
# sort fine by book-to-market ratio, get lower quintile
has_book = [f for f in fine if f.FinancialStatements.BalanceSheet.NetTangibleAssets.TwelveMonths and f.MarketCap]
sorted_by_BM = sorted(has_book, key=lambda x: x.FinancialStatements.BalanceSheet.NetTangibleAssets.TwelveMonths / x.MarketCap)[:len(has_book)//4]
# choose tech stocks from lower quintile
tech_symbols = [f.Symbol for f in sorted_by_BM if f in tech_securities]
ratioDicts_medians = [(tech_ROA1Y, tech_ROA1Y_median),
(tech_CFROA, tech_CFROA_median), (tech_RD2MktCap, tech_RD2MktCap_median),
(tech_CaPex2MktCap, tech_CaPex2MktCap_median), (tech_Ad2MktCap, tech_Ad2MktCap_median)]
def compute_g_score(symbol):
g_score = 0
if tech_CFROA[symbol] > tech_ROA1Y[symbol]:
g_score += 1
if symbol in tech_VARROA and tech_VARROA[symbol] < tech_VARROA_median:
g_score += 1
for ratio_dict, median in ratioDicts_medians:
if symbol in ratio_dict and ratio_dict[symbol] > median:
g_score += 1
return g_score
# compute g-scores for each symbol
g_scores = {symbol:compute_g_score(symbol) for symbol in tech_symbols}
#self.tickers = [symbol for symbol, g_score in g_scores.items() if g_score >= 5]
#return [symbol for symbol, g_score in g_scores.items() if g_score >= 5]
self.tickers = [symbol for symbol, g_score in g_scores.items() if g_score >= 5]
#self.tickers = [c.Symbol for c in topFine]
return self.tickers
def beforeTheOpen(self):
self.Log("SPY: {0}".format(self.spy.Close))
#for i in range(len(self.tickers)):
# self.Log("ATR: {0}".format(self.atr[i].Current.Value))
def OnOrderEvent(self, OrderEvent):
'''Event when the order is filled. Debug log the order fill. :OrderEvent:'''
if OrderEvent.FillQuantity == 0:
return
# Get the filled order
Order = self.Transactions.GetOrderById(OrderEvent.OrderId)
# Log the filled order details
self.Log("ORDER NOTIFICATION >> {} >> Status: {} Symbol: {}. Quantity: "
"{}. Direction: {}. Fill Price {}".format(str(Order.Tag),
str(OrderEvent.Status),
str(OrderEvent.Symbol),
str(OrderEvent.FillQuantity),
str(OrderEvent.Direction),
str(OrderEvent.FillPrice)))
def OnData(self, data):
# ------------------------------------------
for symbol, symbolData in self.symbolDataBySymbol.items():
if data.ContainsKey(symbol) and data.HasData:
# Create some Alias
# ------------------------------------------
holdings = self.Portfolio[symbol].Quantity
value = self.Portfolio.TotalPortfolioValue
cash = self.Portfolio.Cash
# Even then, occassionally, there is no Open attribute
try:
O = round(data[symbol].Open, 2)
H = round(data[symbol].High, 2)
L = round(data[symbol].Low, 2)
C = round(data[symbol].Close, 2)
except AttributeError:
self.Log('>> {} >> Missing Data')
return
# Calcuate our base SL level. This is used for the initial entry.
# We will also use it to compare to the previous trail level.
self.base_sl_level = round(C * .98,2)
#base_sl_level = round(C - self.trail_dist,2)
# Log OHLC - This can be useful for debugging to see where price is moving
self.Log('>> {} >> ON DATA >> >> >> >> >> >>'.format(symbol))
self.Log('>> OHLC >> O:{} H:{} L:{} C:{}'.format(O,H,L,C))
self.Log('>> SL >> Base Level:{} Last Trail Level:{}'.format(self.base_sl_level, self.last_trail_level))
self.Log('>> Account >> Cash:{}, Val:{}, Holdings:{}'.format(cash, value, holdings))
def tradeStart(self):
self.trade = True
def tradeEnd(self):
self.trade = False
def buySignals(self):
if self.trade == False:
return
# Return if benchmark is below SMA
# for symbolmark, symbolmarkData in self.marketDataBySymbol.items():
# if (self.Securities[symbolmark].Close > symbolmarkData.sma200.Current.Value):
# return
#price = self.Securities["HAL"].Price
for symbol, symbolData in self.symbolDataBySymbol.items():
# if not self.Portfolio[symbol].Invested and (self.Securities[symbol].Close < symbolData.sma200.Current.Value) and (symbolData.rsi.Current.Value < 30) and (self.Securities[symbol].Close > symbolData.sma7.Current.Value):
# self.SetHoldings(symbol, .1, False, "Buy Signal")
#if not self.Portfolio[symbol].Invested and (symbolData.ema10.Current.Value > symbolData.ema20.Current.Value) and (symbolData.ema50.Current.Value > symbolData.ema100.Current.Value) and (self.Securities[symbol].Close < symbolData.ema50.Current.Value):
# self.SetHoldings(symbol, 1, False, "Buy Signal")
if not self.Portfolio[symbol].Invested: # and (self.Securities[symbol].Close > symbolData.low.Current.Value):# and (self.Securities[symbol].Close > symbolData.sma200.Current.Value):
self.SetHoldings(symbol, .2, False, "Buy Signal")
#elif not self.Portfolio[symbol].Invested and (self.Securities[symbol].Close > symbolData.high.Current.Value):
# self.SetHoldings(symbol, .1, False, "Buy Signal")
def sellSignals(self):
if self.trade == False:
return
for symbol, symbolData in self.symbolDataBySymbol.items():
#if self.Portfolio[symbol].Invested and (symbolData.rsi.Current.Value > 90):
# self.StopMarketOrder(symbol, .05, self.Securities[symbol].Close)
#if self.Portfolio[symbol].Invested and (self.Securities[symbol].Close > symbolData.sma200.Current.Value) and (symbolData.sma20.Current.Value < symbolData.atr.Current.Value):
# self.Liquidate(symbol, "Sell Signal")
if self.Portfolio[symbol].Invested and (self.Securities[symbol].Close > symbolData.low.Current.Value):
self.Liquidate(symbol, "Sell Signal")
def sellProfitSignals(self):
if self.trade == False:
return
for symbol, symbolData in self.symbolDataBySymbol.items():
if self.Portfolio[symbol].Invested and (self.Securities[symbol].Close > symbolData.high.Current.Value):
self.Liquidate(symbol, "Sell Signal")
# Update our trailing stop loss as necessary
#def TrailingUpdate(self):
# if self.trade == False:
# return
# for symbol, symbolData in self.symbolDataBySymbol.items():
# if self.Portfolio[symbol].Invested
def OnSMA(self, sender, updated):
if self.sma.IsReady:
#self.Debug(f"SMA Updated on {self.Time} with value: {self.sma.Current.Value}")
return
class symbolMarkData:
def __init__(self, symbol, sma50, sma200):
self.Symbol = symbol
self.sma50 = sma50
self.sma200 = sma200
class SymbolData:
def __init__(self, symbol, ema10, sma20, sma200, sma7, ema100, ema200, ema300, ema20, ema50, rsi, wilr, wilr_fast, high, low, atr):
self.Symbol = symbol
self.ema10 = ema10
self.sma20 = sma20
self.ema100 = ema100
self.sma200 = sma200
self.ema200 = ema200
self.ema300 = ema300
self.sma7 = sma7
self.ema20 = ema20
self.ema50 = ema50
self.rsi = rsi
self.wilr = wilr
self.wilr_fast = wilr_fast
self.low = low
self.high = high
self.atr = atr
#self.emaConsolidate = emaConsolidate
#self.smaConsolidate = smaConsolidate#region imports
from AlgorithmImports import *
#endregion
#Imports
from itertools import groupby
from datetime import datetime, timedelta
from pytz import utc
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Algorithm.Framework")
from QuantConnect import Resolution, Extensions
from QuantConnect.Algorithm.Framework.Portfolio import *
from QuantConnect.Algorithm.Framework.Risk import *
#Global variables
Zero = int(0)
class TrailingStop(RiskManagementModel):
def __init__(self):
'''
Initialization variables
'''
# Long Position Variables
self.LongTrail = {}
self.LongTrailingDrawdown = float(0.03)
# Short Position Variables
self.ShortTrail = {}
self.ShortTrailingDrawdown = float(0.03)
def ManageRisk(self, algorithm, targets):
'''
Main risk management handler. Passes algorithm and targets
'''
RiskAdjustedTargets = []
for asset in self.LongTrail:
if not algorithm.Portfolio[asset].Invested:
self.LongTrail[asset] = [algorithm.Securities[asset].Price, 0]
for asset in self.ShortTrail:
if not algorithm.Portfolio[asset].Invested:
self.ShortTrail[asset] = [algorithm.Securities[asset].Price, 0]
invested = [x.Key for x in algorithm.Portfolio if x.Value.Invested]
if invested:
for asset in invested:
if algorithm.Portfolio[asset].IsLong:
if asset not in self.LongTrail or self.LongTrail[asset][1] == 0:
self.LongTrail[asset] = [algorithm.Portfolio[asset].Price, algorithm.Portfolio[asset].Quantity]
elif algorithm.Portfolio[asset].IsShort:
if asset not in self.ShortTrail or self.ShortTrail[asset][1] == 0:
self.ShortTrail[asset] = [algorithm.Portfolio[asset].Price, algorithm.Portfolio[asset].Quantity]
self.TrailingStop(algorithm, asset, RiskAdjustedTargets)
return RiskAdjustedTargets
def TrailingStop(self, algorithm, asset, RiskAdjustedTargets):
'''
Manages trailing stop for both long and short assets respectively
'''
if algorithm.Portfolio[asset].IsLong:
if algorithm.Portfolio[asset].Price > self.LongTrail[asset][0]:
self.LongTrail[asset][0] = algorithm.Portfolio[asset].Price
elif algorithm.Portfolio[asset].Price / self.LongTrail[asset][0] < (1-self.LongTrailingDrawdown):
RiskAdjustedTargets.append(PortfolioTarget(asset, 0))
algorithm.Debug(f'Long trailing Stop Triggered for {asset}. Current Price: {algorithm.Portfolio[asset].Price} | Highest Price: {self.LongTrail[asset][0]} | Loss: {algorithm.Portfolio[asset].Price / self.LongTrail[asset][0]} | Date: {algorithm.Time}')
self.LongTrail.pop(asset)
if algorithm.Portfolio[asset].IsShort:
if algorithm.Portfolio[asset].Price < self.ShortTrail[asset][0]:
self.ShortTrail[asset][0] = algorithm.Portfolio[asset].Price
elif algorithm.Portfolio[asset].Price / self.ShortTrail[asset][0] > 1 / (1-self.ShortTrailingDrawdown):
RiskAdjustedTargets.append(PortfolioTarget(asset, 0))
algorithm.Debug(f'Short trailing Stop Triggered for {asset}. Current Price: {algorithm.Portfolio[asset].Price} | Lowest Price: {self.ShortTrail[asset][0]} | Loss: {algorithm.Portfolio[asset].Price / self.ShortTrail[asset][0]} | Date: {algorithm.Time}')
self.ShortTrail.pop(asset)
return RiskAdjustedTargets