Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -1.856 Tracking Error 0.094 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset |
from datetime import timedelta from QuantConnect.Data.UniverseSelection import * from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel class LiquidValueStocks(QCAlgorithm): def Initialize(self): self.averages = {} self.closes = {} self.SetStartDate(2021, 7, 16) self.SetEndDate(2021, 8, 18) self.SetCash(100000) self.UniverseSettings.Resolution = Resolution.Minute self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.SelectCoarse, self.SelectFine)) self.UniverseSettings.ExtendedMarketHours = True #1. Create and instance of the LongShortEYAlphaModel #self.SetAlpha(LongShortEYAlphaModel()) #self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel()) #self.SetExecution(ImmediateExecutionModel()) def OnData(self, data): #at 9am filter for stocks that have gapped up more than 5% if self.Time.hour == 9 and self.Time.minute == 0: self.gap = {} for i in self.ActiveSecurities.Values: gap = 100*(self.Securities[i.Symbol].Price)/self.closes[i.Symbol] - 100 #self.Debug(f"{self.Time} Ticker: {i.Symbol.Value} Price: {self.Securities[i.Symbol].Price} Close: {self.closes[i.Symbol]}") if gap > 5: self.gap[i.Symbol] = gap sortedByGap = {k: v for k, v in sorted(self.gap.items(), key=lambda item: item[1], reverse = True)} for tick in sortedByGap: self.Log(f"{self.Time} Ticker: {tick.Value} Gap: {self.gap[tick]}") #Coarse selection; fileters by price and securities above EMAs def SelectCoarse(self, coarse): filtered = [x for x in coarse if x.Price > 1 and x.Price < 20 ] self.Log(f"Number of stocks matching coarse1 filter: {len(filtered)}") selected = [] #Filters for price above 50 and 200 EMA for y in filtered: symbol = y.Symbol if symbol not in self.averages: #Call history to get an array of 200 days of history data history = self.History(symbol, 200, Resolution.Daily) #Adjust SelectionData to pass in the history result self.averages[symbol] = SelectionData(history) self.averages[symbol].update(self.Time, y.AdjustedPrice) # if EMAs are ready and price is above EMAs add to selected list if self.averages[symbol].is_ready(): if y.AdjustedPrice > self.averages[symbol].fast.Current.Value and y.AdjustedPrice > self.averages[symbol].slow.Current.Value: selected.append(y) self.Log(f"Number of stocks matching coarse2 filter: {len(selected)}") return [f.Symbol for f in selected] #Fine selection filters securities by shares outstanding def SelectFine(self, fine): filtered = [x for x in fine if x.CompanyProfile.SharesOutstanding < 50000000 and x.CompanyProfile.SharesOutstanding > 0 ] filtered2 = sorted(filtered, key=lambda x: x.CompanyProfile.SharesOutstanding, reverse=False) self.Log(f"Number of stocks matching fine filter: {len(filtered2)}") #Create dictionary of yesterdays close prices to be used in ondata to filter by gap from yesterday close for y in filtered2: self.closes[y.Symbol] = YesterdaysClose(self.History(y.Symbol, 1, Resolution.Daily)).close return [f.Symbol for f in filtered2] #Gets yesterdays close price class YesterdaysClose(): def __init__(self, history): self.close = history.iloc[0].close #Gets 50 and 200 EMA class SelectionData(): #Update the constructor to accept a history array def __init__(self, history): self.slow = ExponentialMovingAverage(200) self.fast = ExponentialMovingAverage(50) #Loop over the history data and update the indicators for bar in history.itertuples(): self.fast.Update(bar.Index[1], bar.close) self.slow.Update(bar.Index[1], bar.close) def is_ready(self): return self.slow.IsReady and self.fast.IsReady def update(self, time, price): self.fast.Update(time, price) self.slow.Update(time, price)