Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -0.566 Tracking Error 0.162 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset |
#region imports from AlgorithmImports import * #endregion from risk import TrailingStop import statistics as stat import pickle from collections import deque class NadionResistanceShield(QCAlgorithm): #class DataConsolidationAlgorithm(QCAlgorithm): def Initialize(self): self.SetStartDate(2017, 1, 1) # Set Start Date #self.SetEndDate(2021, 1, 1) self.SetCash(10000) # Set Strategy Cash self.symbolDataBySymbol = {} self.trade = True self.tickers = [] self.atr=[] self.MarketCaps = ["SPY","QQQ"]#,"MDY","IWM"] self.marketDataBySymbol = {} self.spy = "SPY" self.iwm = "IWM" self.mdy = "MDY" self.qqq = "QQQ" self.tech_ROA_key = 'TECH_ROA' #self.AddRiskManagement(TrailingStop) # Trailing distance in $ self.trail_dist = 5 # Declare an attribute that we shall use for storing our # stop loss ticket. self.sl_order = None # Declare an attribute that we will use to store the last trail level # used. We will use this to decide whether to move the stop self.last_trail_level = None self.AddUniverseSelection( FineFundamentalUniverseSelectionModel(self.CoarseFilter, self.FineFilter) ) self.UniverseSettings.Resolution = Resolution.Hour self.curr_month = -1 # store ROA of tech stocks self.tech_ROA = {} self.symbols = None if self.LiveMode and not self.ObjectStore.ContainsKey(self.tech_ROA_key): self.Quit('QUITTING: USING LIVE MOVE WITHOUT TECH_ROA VALUES IN OBJECT STORE') self.quarters = 0 #self.SetWarmUp(900) for symbolmark in self.MarketCaps: symbol = self.AddEquity(symbolmark, Resolution.Daily).Symbol sma50 = self.SMA(symbol, 50, Resolution.Daily, Field.Close) sma200 = self.SMA(symbol, 200, Resolution.Daily, Field.Close) self.marketDataBySymbol[symbol] = symbolMarkData(symbol, sma50, sma200) for symbol in self.tickers: self.AddEquity(symbol, Resolution.Hour) ema10 = self.EMA(symbol, 10, Resolution.Hour, Field.Close) sma200 = self.SMA(symbol, 200, Resolution.Daily, Field.Close) sma7 = self.SMA(symbol, 7, Resolution.Hour, Field.Close) sma20 = self.SMA(symbol, 20, Resolution.Daily, Field.Close) self.sma = self.SMA(symbol, 20, Resolution.Hour, Field.Close) ema100 = self.SMA(symbol, 100, Resolution.Daily, Field.Close) ema200 = self.SMA(symbol, 200, Resolution.Daily, Field.Close) ema300 = self.SMA(symbol, 300, Resolution.Daily, Field.Close) ema20 = self.EMA(symbol, 20, Resolution.Hour, Field.Close) ema50 = self.EMA(symbol, 50, Resolution.Hour, Field.Close) rsi = self.RSI(symbol, 14, Resolution.Daily) wilr = self.WILR(symbol, 14, Resolution.Daily) wilr_fast = self.WILR(symbol, 10, Resolution.Daily) atr = self.ATR(symbol, 20, Resolution.Daily) self.atr.append(self.ATR(symbol, 7, Resolution.Daily)) high = self.MAX(symbol, 10, Resolution.Daily, Field.High) high2 = self.MAX(symbol, 20, Resolution.Daily, Field.High) low = self.MIN(symbol, 5, Resolution.Hour, Field.Low) low2 = self.MIN(symbol, 5, Resolution.Daily, Field.Low) self.sma.Updated += self.OnSMA symbolData = SymbolData(symbol, ema10, sma20, sma200, sma7, ema100, ema200, ema300, ema20, ema50, rsi, wilr, wilr_fast, high, low, atr) self.symbolDataBySymbol[symbol] = symbolData self.spy = self.AddEquity("SPY", Resolution.Daily) # Before the open self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", -5), Action(self.beforeTheOpen)) #set the following between 1 - 4 hours depending on buy frequency self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.Every(timedelta(hours=6.5)), self.buySignals) #self.TimeRules.EveryDay, #self.buySignals) self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.Every(timedelta(hours=6.5)), self.sellSignals) self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.Every(timedelta(hours=2)), self.sellProfitSignals) self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY"), self.tradeStart) self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY"), self.tradeEnd) #self.AddRiskManagement(TrailingStop) #self.AddRiskManagement(TrailingStopRiskManagementMode(0.04)) def OnEndOfAlgorithm(self): self.Log('Algorithm End') self.SaveData() def SaveData(self): ''' Saves the tech ROA data to ObjectStore ''' # Symbol objects aren't picklable, hence why we use the ticker string tech_ROA = {symbol.Value:ROA for symbol, ROA in self.tech_ROA.items()} self.ObjectStore.SaveBytes(self.tech_ROA_key, pickle.dumps(tech_ROA)) def CoarseFilter(self, coarse): # load data from ObjectStore if len(self.tech_ROA) == 0 and self.ObjectStore.ContainsKey(self.tech_ROA_key): tech_ROA = self.ObjectStore.ReadBytes(self.tech_ROA_key) tech_ROA = pickle.loads(bytearray(tech_ROA)) self.tech_ROA = {Symbol.Create(ticker, SecurityType.Equity, Market.USA):ROA for ticker, ROA in tech_ROA.items()} return list(self.tech_ROA.keys()) if self.curr_month == self.Time.month: return Universe.Unchanged self.curr_month = self.Time.month # we only want to update our ROA values every three months if self.Time.month % 3 != 1: return Universe.Unchanged self.quarters += 1 self.coarselist = [c.Symbol for c in coarse if c.HasFundamentalData] return self.coarselist def FineFilter(self, fine): # book value == FinancialStatements.BalanceSheet.NetTangibleAssets (book value and NTA are synonyms) # BM (Book-to-Market) == book value / MarketCap # ROA == OperationRatios.ROA # CFROA == FinancialStatements.CashFlowStatement.OperatingCashFlow / FinancialStatements.BalanceSheet.TotalAssets # R&D to MktCap == FinancialStatements.IncomeStatement.ResearchAndDevelopment / MarketCap # CapEx to MktCap == FinancialStatements.CashFlowStatement.CapExReported / MarketCap # Advertising to MktCap == FinancialStatements.IncomeStatement.SellingGeneralAndAdministration / MarketCap # note: this parameter may be slightly higher than pure advertising costs tech_securities = [f for f in fine if f.AssetClassification.MorningstarSectorCode == MorningstarSectorCode.Technology and f.OperationRatios.ROA.ThreeMonths] for security in tech_securities: # we use deques instead of RWs since deques are picklable symbol = security.Symbol if symbol not in self.tech_ROA: # 3 years * 4 quarters = 12 quarters of data self.tech_ROA[symbol] = deque(maxlen=12) self.tech_ROA[symbol].append(security.OperationRatios.ROA.ThreeMonths) if self.LiveMode: # this ensures we don't lose new data from an algorithm outage self.SaveData() # we want to rebalance in the fourth month after the (fiscal) year ends # so that we have the most recent quarter's data if self.Time.month != 4 or (self.quarters < 12 and not self.LiveMode): return Universe.Unchanged # make sure our stocks has these fundamentals tech_securities = [x for x in tech_securities if x.OperationRatios.ROA.OneYear and x.FinancialStatements.CashFlowStatement.OperatingCashFlow.TwelveMonths and x.FinancialStatements.BalanceSheet.TotalAssets.TwelveMonths and x.FinancialStatements.IncomeStatement.ResearchAndDevelopment.TwelveMonths and x.FinancialStatements.CashFlowStatement.CapExReported.TwelveMonths and x.FinancialStatements.IncomeStatement.SellingGeneralAndAdministration.TwelveMonths and x.MarketCap] # compute the variance of the ROA for each tech stock tech_VARROA = {symbol:stat.variance(ROA) for symbol, ROA in self.tech_ROA.items() if len(ROA) == ROA.maxlen} if len(tech_VARROA) < 2: return Universe.Unchanged tech_VARROA_median = stat.median(tech_VARROA.values()) # we will now map tech Symbols to various fundamental ratios, # and compute the median for each ratio # ROA 1-year tech_ROA1Y = {x.Symbol:x.OperationRatios.ROA.OneYear for x in tech_securities} tech_ROA1Y_median = stat.median(tech_ROA1Y.values()) # Cash Flow ROA tech_CFROA = {x.Symbol: ( x.FinancialStatements.CashFlowStatement.OperatingCashFlow.TwelveMonths / x.FinancialStatements.BalanceSheet.TotalAssets.TwelveMonths ) for x in tech_securities} tech_CFROA_median = stat.median(tech_CFROA.values()) # R&D to MktCap tech_RD2MktCap = {x.Symbol: ( x.FinancialStatements.IncomeStatement.ResearchAndDevelopment.TwelveMonths / x.MarketCap ) for x in tech_securities} tech_RD2MktCap_median = stat.median(tech_RD2MktCap.values()) # CapEx to MktCap tech_CaPex2MktCap = {x.Symbol: ( x.FinancialStatements.CashFlowStatement.CapExReported.TwelveMonths / x.MarketCap ) for x in tech_securities} tech_CaPex2MktCap_median = stat.median(tech_CaPex2MktCap.values()) # Advertising to MktCap tech_Ad2MktCap = {x.Symbol: ( x.FinancialStatements.IncomeStatement.SellingGeneralAndAdministration.TwelveMonths / x.MarketCap ) for x in tech_securities} tech_Ad2MktCap_median = stat.median(tech_Ad2MktCap.values()) # sort fine by book-to-market ratio, get lower quintile has_book = [f for f in fine if f.FinancialStatements.BalanceSheet.NetTangibleAssets.TwelveMonths and f.MarketCap] sorted_by_BM = sorted(has_book, key=lambda x: x.FinancialStatements.BalanceSheet.NetTangibleAssets.TwelveMonths / x.MarketCap)[:len(has_book)//4] # choose tech stocks from lower quintile tech_symbols = [f.Symbol for f in sorted_by_BM if f in tech_securities] ratioDicts_medians = [(tech_ROA1Y, tech_ROA1Y_median), (tech_CFROA, tech_CFROA_median), (tech_RD2MktCap, tech_RD2MktCap_median), (tech_CaPex2MktCap, tech_CaPex2MktCap_median), (tech_Ad2MktCap, tech_Ad2MktCap_median)] def compute_g_score(symbol): g_score = 0 if tech_CFROA[symbol] > tech_ROA1Y[symbol]: g_score += 1 if symbol in tech_VARROA and tech_VARROA[symbol] < tech_VARROA_median: g_score += 1 for ratio_dict, median in ratioDicts_medians: if symbol in ratio_dict and ratio_dict[symbol] > median: g_score += 1 return g_score # compute g-scores for each symbol g_scores = {symbol:compute_g_score(symbol) for symbol in tech_symbols} #self.tickers = [symbol for symbol, g_score in g_scores.items() if g_score >= 5] #return [symbol for symbol, g_score in g_scores.items() if g_score >= 5] self.tickers = [symbol for symbol, g_score in g_scores.items() if g_score >= 5] #self.tickers = [c.Symbol for c in topFine] return self.tickers def beforeTheOpen(self): self.Log("SPY: {0}".format(self.spy.Close)) #for i in range(len(self.tickers)): # self.Log("ATR: {0}".format(self.atr[i].Current.Value)) def OnOrderEvent(self, OrderEvent): '''Event when the order is filled. Debug log the order fill. :OrderEvent:''' if OrderEvent.FillQuantity == 0: return # Get the filled order Order = self.Transactions.GetOrderById(OrderEvent.OrderId) # Log the filled order details self.Log("ORDER NOTIFICATION >> {} >> Status: {} Symbol: {}. Quantity: " "{}. Direction: {}. Fill Price {}".format(str(Order.Tag), str(OrderEvent.Status), str(OrderEvent.Symbol), str(OrderEvent.FillQuantity), str(OrderEvent.Direction), str(OrderEvent.FillPrice))) def OnData(self, data): # ------------------------------------------ for symbol, symbolData in self.symbolDataBySymbol.items(): if data.ContainsKey(symbol) and data.HasData: # Create some Alias # ------------------------------------------ holdings = self.Portfolio[symbol].Quantity value = self.Portfolio.TotalPortfolioValue cash = self.Portfolio.Cash # Even then, occassionally, there is no Open attribute try: O = round(data[symbol].Open, 2) H = round(data[symbol].High, 2) L = round(data[symbol].Low, 2) C = round(data[symbol].Close, 2) except AttributeError: self.Log('>> {} >> Missing Data') return # Calcuate our base SL level. This is used for the initial entry. # We will also use it to compare to the previous trail level. self.base_sl_level = round(C * .98,2) #base_sl_level = round(C - self.trail_dist,2) # Log OHLC - This can be useful for debugging to see where price is moving self.Log('>> {} >> ON DATA >> >> >> >> >> >>'.format(symbol)) self.Log('>> OHLC >> O:{} H:{} L:{} C:{}'.format(O,H,L,C)) self.Log('>> SL >> Base Level:{} Last Trail Level:{}'.format(self.base_sl_level, self.last_trail_level)) self.Log('>> Account >> Cash:{}, Val:{}, Holdings:{}'.format(cash, value, holdings)) def tradeStart(self): self.trade = True def tradeEnd(self): self.trade = False def buySignals(self): if self.trade == False: return # Return if benchmark is below SMA # for symbolmark, symbolmarkData in self.marketDataBySymbol.items(): # if (self.Securities[symbolmark].Close > symbolmarkData.sma200.Current.Value): # return #price = self.Securities["HAL"].Price for symbol, symbolData in self.symbolDataBySymbol.items(): # if not self.Portfolio[symbol].Invested and (self.Securities[symbol].Close < symbolData.sma200.Current.Value) and (symbolData.rsi.Current.Value < 30) and (self.Securities[symbol].Close > symbolData.sma7.Current.Value): # self.SetHoldings(symbol, .1, False, "Buy Signal") #if not self.Portfolio[symbol].Invested and (symbolData.ema10.Current.Value > symbolData.ema20.Current.Value) and (symbolData.ema50.Current.Value > symbolData.ema100.Current.Value) and (self.Securities[symbol].Close < symbolData.ema50.Current.Value): # self.SetHoldings(symbol, 1, False, "Buy Signal") if not self.Portfolio[symbol].Invested: # and (self.Securities[symbol].Close > symbolData.low.Current.Value):# and (self.Securities[symbol].Close > symbolData.sma200.Current.Value): self.SetHoldings(symbol, .2, False, "Buy Signal") #elif not self.Portfolio[symbol].Invested and (self.Securities[symbol].Close > symbolData.high.Current.Value): # self.SetHoldings(symbol, .1, False, "Buy Signal") def sellSignals(self): if self.trade == False: return for symbol, symbolData in self.symbolDataBySymbol.items(): #if self.Portfolio[symbol].Invested and (symbolData.rsi.Current.Value > 90): # self.StopMarketOrder(symbol, .05, self.Securities[symbol].Close) #if self.Portfolio[symbol].Invested and (self.Securities[symbol].Close > symbolData.sma200.Current.Value) and (symbolData.sma20.Current.Value < symbolData.atr.Current.Value): # self.Liquidate(symbol, "Sell Signal") if self.Portfolio[symbol].Invested and (self.Securities[symbol].Close > symbolData.low.Current.Value): self.Liquidate(symbol, "Sell Signal") def sellProfitSignals(self): if self.trade == False: return for symbol, symbolData in self.symbolDataBySymbol.items(): if self.Portfolio[symbol].Invested and (self.Securities[symbol].Close > symbolData.high.Current.Value): self.Liquidate(symbol, "Sell Signal") # Update our trailing stop loss as necessary #def TrailingUpdate(self): # if self.trade == False: # return # for symbol, symbolData in self.symbolDataBySymbol.items(): # if self.Portfolio[symbol].Invested def OnSMA(self, sender, updated): if self.sma.IsReady: #self.Debug(f"SMA Updated on {self.Time} with value: {self.sma.Current.Value}") return class symbolMarkData: def __init__(self, symbol, sma50, sma200): self.Symbol = symbol self.sma50 = sma50 self.sma200 = sma200 class SymbolData: def __init__(self, symbol, ema10, sma20, sma200, sma7, ema100, ema200, ema300, ema20, ema50, rsi, wilr, wilr_fast, high, low, atr): self.Symbol = symbol self.ema10 = ema10 self.sma20 = sma20 self.ema100 = ema100 self.sma200 = sma200 self.ema200 = ema200 self.ema300 = ema300 self.sma7 = sma7 self.ema20 = ema20 self.ema50 = ema50 self.rsi = rsi self.wilr = wilr self.wilr_fast = wilr_fast self.low = low self.high = high self.atr = atr #self.emaConsolidate = emaConsolidate #self.smaConsolidate = smaConsolidate
#region imports from AlgorithmImports import * #endregion #Imports from itertools import groupby from datetime import datetime, timedelta from pytz import utc from clr import AddReference AddReference("System") AddReference("QuantConnect.Common") AddReference("QuantConnect.Algorithm") AddReference("QuantConnect.Algorithm.Framework") from QuantConnect import Resolution, Extensions from QuantConnect.Algorithm.Framework.Portfolio import * from QuantConnect.Algorithm.Framework.Risk import * #Global variables Zero = int(0) class TrailingStop(RiskManagementModel): def __init__(self): ''' Initialization variables ''' # Long Position Variables self.LongTrail = {} self.LongTrailingDrawdown = float(0.03) # Short Position Variables self.ShortTrail = {} self.ShortTrailingDrawdown = float(0.03) def ManageRisk(self, algorithm, targets): ''' Main risk management handler. Passes algorithm and targets ''' RiskAdjustedTargets = [] for asset in self.LongTrail: if not algorithm.Portfolio[asset].Invested: self.LongTrail[asset] = [algorithm.Securities[asset].Price, 0] for asset in self.ShortTrail: if not algorithm.Portfolio[asset].Invested: self.ShortTrail[asset] = [algorithm.Securities[asset].Price, 0] invested = [x.Key for x in algorithm.Portfolio if x.Value.Invested] if invested: for asset in invested: if algorithm.Portfolio[asset].IsLong: if asset not in self.LongTrail or self.LongTrail[asset][1] == 0: self.LongTrail[asset] = [algorithm.Portfolio[asset].Price, algorithm.Portfolio[asset].Quantity] elif algorithm.Portfolio[asset].IsShort: if asset not in self.ShortTrail or self.ShortTrail[asset][1] == 0: self.ShortTrail[asset] = [algorithm.Portfolio[asset].Price, algorithm.Portfolio[asset].Quantity] self.TrailingStop(algorithm, asset, RiskAdjustedTargets) return RiskAdjustedTargets def TrailingStop(self, algorithm, asset, RiskAdjustedTargets): ''' Manages trailing stop for both long and short assets respectively ''' if algorithm.Portfolio[asset].IsLong: if algorithm.Portfolio[asset].Price > self.LongTrail[asset][0]: self.LongTrail[asset][0] = algorithm.Portfolio[asset].Price elif algorithm.Portfolio[asset].Price / self.LongTrail[asset][0] < (1-self.LongTrailingDrawdown): RiskAdjustedTargets.append(PortfolioTarget(asset, 0)) algorithm.Debug(f'Long trailing Stop Triggered for {asset}. Current Price: {algorithm.Portfolio[asset].Price} | Highest Price: {self.LongTrail[asset][0]} | Loss: {algorithm.Portfolio[asset].Price / self.LongTrail[asset][0]} | Date: {algorithm.Time}') self.LongTrail.pop(asset) if algorithm.Portfolio[asset].IsShort: if algorithm.Portfolio[asset].Price < self.ShortTrail[asset][0]: self.ShortTrail[asset][0] = algorithm.Portfolio[asset].Price elif algorithm.Portfolio[asset].Price / self.ShortTrail[asset][0] > 1 / (1-self.ShortTrailingDrawdown): RiskAdjustedTargets.append(PortfolioTarget(asset, 0)) algorithm.Debug(f'Short trailing Stop Triggered for {asset}. Current Price: {algorithm.Portfolio[asset].Price} | Lowest Price: {self.ShortTrail[asset][0]} | Loss: {algorithm.Portfolio[asset].Price / self.ShortTrail[asset][0]} | Date: {algorithm.Time}') self.ShortTrail.pop(asset) return RiskAdjustedTargets